Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace periodic push accepted gossip with pull preference gossip for block discovery #2367

Merged
merged 58 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
90c5ea4
Replace periodic push gossip with pull gossip
StephenButtolph Nov 23, 2023
231f629
Add timestamps during accept
StephenButtolph Nov 23, 2023
d2365ba
lint
StephenButtolph Nov 23, 2023
cdf0406
Add metric for duration between block timestamp and acceptance time
StephenButtolph Nov 24, 2023
87bbdc1
merged
StephenButtolph Nov 24, 2023
05adad5
nit
StephenButtolph Nov 24, 2023
ca9e454
reduce diff
StephenButtolph Nov 24, 2023
edd6c67
adjust gossip breadth to account for increased frequency
StephenButtolph Nov 24, 2023
7cace9a
Separate new config
StephenButtolph Nov 24, 2023
4af40f4
cleanup
StephenButtolph Nov 24, 2023
25a7199
merged
StephenButtolph Nov 24, 2023
369134a
merged
StephenButtolph Nov 24, 2023
e2df3f0
merged
StephenButtolph Nov 24, 2023
3e9516d
fix tests
StephenButtolph Nov 24, 2023
008011c
increase number of queried nodes
StephenButtolph Nov 24, 2023
df92044
Send fetch preference rather than last accepted
StephenButtolph Nov 24, 2023
85cc98b
renames
StephenButtolph Nov 24, 2023
ceee9aa
nit
StephenButtolph Nov 24, 2023
77c035a
nit
StephenButtolph Nov 24, 2023
5891bba
nit skip gossip request when already polling
StephenButtolph Nov 24, 2023
23988e1
more gossip
StephenButtolph Nov 25, 2023
3918e21
nit
StephenButtolph Nov 25, 2023
380dde2
address nits
StephenButtolph Nov 26, 2023
f752ea8
Add metric to track the stake weight of block providers
StephenButtolph Nov 27, 2023
199848a
WIP add bimap
StephenButtolph Nov 27, 2023
618eac0
merged
StephenButtolph Nov 28, 2023
43faeb9
Implement generic bimap
StephenButtolph Nov 28, 2023
613c28b
docs
StephenButtolph Nov 28, 2023
332c70b
Merge branch 'dev' into bimap
StephenButtolph Nov 28, 2023
6cb48c4
tests
StephenButtolph Nov 28, 2023
01c77be
minimize diff
StephenButtolph Nov 28, 2023
cdf6f41
nit
StephenButtolph Nov 28, 2023
b69eb29
nit
StephenButtolph Nov 28, 2023
e5bd019
Unexport RequestID from snowman engine
StephenButtolph Nov 28, 2023
4f15756
Add block source metrics to monitor gossip
StephenButtolph Nov 28, 2023
86c4661
merged
StephenButtolph Nov 28, 2023
38f29fb
Merge branch 'dev' into add-stake-weight-metric
StephenButtolph Nov 28, 2023
de78ec8
Merge branch 'unexport-request-id' into track-block-request-origin
StephenButtolph Nov 28, 2023
66e9283
Merge branch 'track-block-request-origin' into gossip-source-metrics
StephenButtolph Nov 28, 2023
7cc241c
merged
StephenButtolph Nov 28, 2023
2f61dd1
fix
StephenButtolph Nov 28, 2023
81d6c00
Merge branch 'add-stake-weight-metric' of github.com:ava-labs/avalanc…
StephenButtolph Nov 28, 2023
3234dde
merged
StephenButtolph Nov 28, 2023
7b29a34
merged
StephenButtolph Nov 28, 2023
07058ea
merged
StephenButtolph Nov 28, 2023
285b645
fix merge
StephenButtolph Nov 28, 2023
bcdb9e9
nit
StephenButtolph Nov 28, 2023
43f4683
Merge branch 'track-block-request-origin' into gossip-source-metrics
StephenButtolph Nov 28, 2023
6b79747
Merge branch 'gossip-source-metrics' into replace-periodic-gossip-pre…
StephenButtolph Nov 28, 2023
6097f29
merged
StephenButtolph Nov 28, 2023
dab7a8f
Merge branch 'track-block-request-origin' into gossip-source-metrics
StephenButtolph Nov 28, 2023
6e254d3
Merge branch 'gossip-source-metrics' into replace-periodic-gossip-pre…
StephenButtolph Nov 28, 2023
06e791b
Use uniform pull gossip (#2388)
StephenButtolph Nov 29, 2023
ec35e27
merged
StephenButtolph Nov 29, 2023
83e5ec9
add comment
StephenButtolph Nov 29, 2023
b82b8e0
Merge branch 'dev' into replace-periodic-gossip-preference
StephenButtolph Nov 29, 2023
7ec8852
nit
StephenButtolph Nov 29, 2023
1db911f
Merge branch 'replace-periodic-gossip-preference' of github.com:ava-l…
StephenButtolph Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ type ManagerConfig struct {
MeterVMEnabled bool // Should each VM be wrapped with a MeterVM
Metrics metrics.MultiGatherer

AcceptedFrontierGossipFrequency time.Duration
ConsensusAppConcurrency int
FrontierGossipFrequency time.Duration
ConsensusAppConcurrency int

// Max Time to spend fetching a container and its
// ancestors when responding to a GetAncestors
Expand Down Expand Up @@ -824,7 +824,7 @@ func (m *manager) createAvalancheChain(
ctx,
vdrs,
msgChan,
m.AcceptedFrontierGossipFrequency,
m.FrontierGossipFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
validators.UnhandledSubnetConnector, // avalanche chains don't use subnet connector
Expand Down Expand Up @@ -1170,7 +1170,7 @@ func (m *manager) createSnowmanChain(
ctx,
vdrs,
msgChan,
m.AcceptedFrontierGossipFrequency,
m.FrontierGossipFrequency,
m.ConsensusAppConcurrency,
m.ResourceTracker,
subnetConnector,
Expand Down
14 changes: 9 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ const (
subnetConfigFileExt = ".json"
ipResolutionTimeout = 30 * time.Second

ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
ipcDeprecationMsg = "IPC API is deprecated"
keystoreDeprecationMsg = "keystore API is deprecated"
acceptedFrontierGossipDeprecationMsg = "push based accepted frontier gossip is deprecated"
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
)

var (
Expand All @@ -72,6 +73,9 @@ var (
IpcsChainIDsKey: ipcDeprecationMsg,
IpcsPathKey: ipcDeprecationMsg,
KeystoreAPIEnabledKey: keystoreDeprecationMsg,
ConsensusGossipAcceptedFrontierValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierNonValidatorSizeKey: acceptedFrontierGossipDeprecationMsg,
ConsensusGossipAcceptedFrontierPeerSizeKey: acceptedFrontierGossipDeprecationMsg,
}

errSybilProtectionDisabledStakerWeights = errors.New("sybil protection disabled weights must be positive")
Expand Down Expand Up @@ -1320,9 +1324,9 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
}

// Gossiping
nodeConfig.AcceptedFrontierGossipFrequency = v.GetDuration(ConsensusAcceptedFrontierGossipFrequencyKey)
if nodeConfig.AcceptedFrontierGossipFrequency < 0 {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusAcceptedFrontierGossipFrequencyKey)
nodeConfig.FrontierGossipFrequency = v.GetDuration(ConsensusGossipFrontierFrequencyKey)
if nodeConfig.FrontierGossipFrequency < 0 {
return node.Config{}, fmt.Errorf("%s must be >= 0", ConsensusGossipFrontierFrequencyKey)
}

// App handling
Expand Down
2 changes: 1 addition & 1 deletion config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ func addNodeFlags(fs *pflag.FlagSet) {
fs.Duration(BenchlistMinFailingDurationKey, constants.DefaultBenchlistMinFailingDuration, "Minimum amount of time messages to a peer must be failing before the peer is benched")

// Router
fs.Duration(ConsensusAcceptedFrontierGossipFrequencyKey, constants.DefaultAcceptedFrontierGossipFrequency, "Frequency of gossiping accepted frontiers")
fs.Uint(ConsensusAppConcurrencyKey, constants.DefaultConsensusAppConcurrency, "Maximum number of goroutines to use when handling App messages on a chain")
fs.Duration(ConsensusShutdownTimeoutKey, constants.DefaultConsensusShutdownTimeout, "Timeout before killing an unresponsive chain")
fs.Duration(ConsensusGossipFrontierFrequencyKey, constants.DefaultFrontierGossipFrequency, "Frequency of polling and 10%% of gossiping of frontiers")
fs.Uint(ConsensusGossipAcceptedFrontierValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierValidatorSize, "Number of validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierNonValidatorSizeKey, constants.DefaultConsensusGossipAcceptedFrontierNonValidatorSize, "Number of non-validators to gossip to when gossiping accepted frontier")
fs.Uint(ConsensusGossipAcceptedFrontierPeerSizeKey, constants.DefaultConsensusGossipAcceptedFrontierPeerSize, "Number of peers to gossip to when gossiping accepted frontier")
Expand Down
4 changes: 2 additions & 2 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ const (
IpcsChainIDsKey = "ipcs-chain-ids"
IpcsPathKey = "ipcs-path"
MeterVMsEnabledKey = "meter-vms-enabled"
ConsensusAcceptedFrontierGossipFrequencyKey = "consensus-accepted-frontier-gossip-frequency"
ConsensusAppConcurrencyKey = "consensus-app-concurrency"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ConsensusGossipFrontierFrequencyKey = "consensus-frontier-gossip-frequency"
ConsensusGossipAcceptedFrontierValidatorSizeKey = "consensus-accepted-frontier-gossip-validator-size"
ConsensusGossipAcceptedFrontierNonValidatorSizeKey = "consensus-accepted-frontier-gossip-non-validator-size"
ConsensusGossipAcceptedFrontierPeerSizeKey = "consensus-accepted-frontier-gossip-peer-size"
Expand All @@ -154,7 +155,6 @@ const (
AppGossipValidatorSizeKey = "consensus-app-gossip-validator-size"
AppGossipNonValidatorSizeKey = "consensus-app-gossip-non-validator-size"
AppGossipPeerSizeKey = "consensus-app-gossip-peer-size"
ConsensusShutdownTimeoutKey = "consensus-shutdown-timeout"
ProposerVMUseCurrentHeightKey = "proposervm-use-current-height"
FdLimitKey = "fd-limit"
IndexEnabledKey = "index-enabled"
Expand Down
4 changes: 2 additions & 2 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ type Config struct {
ConsensusRouter router.Router `json:"-"`
RouterHealthConfig router.HealthConfig `json:"routerHealthConfig"`
ConsensusShutdownTimeout time.Duration `json:"consensusShutdownTimeout"`
// Gossip a container in the accepted frontier every [AcceptedFrontierGossipFrequency]
AcceptedFrontierGossipFrequency time.Duration `json:"consensusGossipFreq"`
// Gossip and query frontiers every [FrontierGossipFrequency]
FrontierGossipFrequency time.Duration `json:"consensusGossipFreq"`
// ConsensusAppConcurrency defines the maximum number of goroutines to
// handle App messages per chain.
ConsensusAppConcurrency int `json:"consensusAppConcurrency"`
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error {
Metrics: n.MetricsGatherer,
SubnetConfigs: n.Config.SubnetConfigs,
ChainConfigs: n.Config.ChainConfigs,
AcceptedFrontierGossipFrequency: n.Config.AcceptedFrontierGossipFrequency,
FrontierGossipFrequency: n.Config.FrontierGossipFrequency,
ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency,
BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors,
BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent,
Expand Down
112 changes: 78 additions & 34 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (
"github.com/ava-labs/avalanchego/utils/wrappers"
)

const nonVerifiedCacheSize = 64 * units.MiB
const (
nonVerifiedCacheSize = 64 * units.MiB
putGossipPeriod = 10
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
)

var _ Engine = (*Transitive)(nil)

Expand All @@ -58,7 +61,9 @@ type Transitive struct {
common.AppHandler
validators.Connector

RequestID uint32
requestID uint32

gossipCounter int

// track outstanding preference requests
polls poll.Set
Expand Down Expand Up @@ -145,6 +150,66 @@ func newTransitive(config Config) (*Transitive, error) {
return t, t.metrics.Initialize("", config.Ctx.Registerer)
}

func (t *Transitive) Gossip(ctx context.Context) error {
lastAcceptedID, lastAcceptedHeight := t.Consensus.LastAccepted()
if numProcessing := t.Consensus.NumProcessing(); numProcessing == 0 {
t.Ctx.Log.Verbo("sampling from validators",
zap.Stringer("validators", t.Validators),
)

vdrIDs, err := t.Validators.Sample(t.Ctx.SubnetID, 1)
if err != nil {
t.Ctx.Log.Error("skipping block gossip",
zap.String("reason", "no validators"),
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
)
return nil
}

nextHeightToAccept, err := math.Add64(lastAcceptedHeight, 1)
if err != nil {
t.Ctx.Log.Error("skipping block gossip",
zap.String("reason", "block height overflow"),
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
zap.Stringer("blkID", lastAcceptedID),
zap.Uint64("lastAcceptedHeight", lastAcceptedHeight),
zap.Error(err),
)
return nil
}

t.requestID++
vdrSet := set.Of(vdrIDs...)
preferredID := t.Consensus.Preference()
t.Sender.SendPullQuery(ctx, vdrSet, t.requestID, preferredID, nextHeightToAccept)
} else {
t.Ctx.Log.Debug("skipping block gossip",
zap.String("reason", "blocks currently processing"),
zap.Int("numProcessing", numProcessing),
)
}

// TODO: Remove periodic push gossip after v1.11.x is activated
t.gossipCounter++
t.gossipCounter %= putGossipPeriod
if t.gossipCounter > 0 {
return nil
}

lastAccepted, err := t.GetBlock(ctx, lastAcceptedID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", lastAcceptedID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", lastAcceptedID),
)
t.Sender.SendGossip(ctx, lastAccepted.Bytes())
return nil
}

func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint32, blkBytes []byte) error {
blk, err := t.VM.ParseBlock(ctx, blkBytes)
if err != nil {
Expand Down Expand Up @@ -346,28 +411,6 @@ func (*Transitive) Timeout(context.Context) error {
return nil
}

func (t *Transitive) Gossip(ctx context.Context) error {
blkID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
}

blk, err := t.GetBlock(ctx, blkID)
if err != nil {
t.Ctx.Log.Warn("dropping gossip request",
zap.String("reason", "block couldn't be loaded"),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
return nil
}
t.Ctx.Log.Verbo("gossiping accepted block to the network",
zap.Stringer("blkID", blkID),
)
t.Sender.SendGossip(ctx, blk.Bytes())
return nil
}

func (*Transitive) Halt(context.Context) {}

func (t *Transitive) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -401,7 +444,7 @@ func (t *Transitive) Context() *snow.ConsensusContext {
}

func (t *Transitive) Start(ctx context.Context, startReqID uint32) error {
t.RequestID = startReqID
t.requestID = startReqID
lastAcceptedID, err := t.VM.LastAccepted(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -770,14 +813,14 @@ func (t *Transitive) sendRequest(ctx context.Context, nodeID ids.NodeID, blkID i
return
}

t.RequestID++
t.blkReqs.Add(nodeID, t.RequestID, blkID)
t.requestID++
t.blkReqs.Add(nodeID, t.requestID, blkID)
t.Ctx.Log.Verbo("sending Get request",
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", t.RequestID),
zap.Uint32("requestID", t.requestID),
zap.Stringer("blkID", blkID),
)
t.Sender.SendGet(ctx, nodeID, t.RequestID, blkID)
t.Sender.SendGet(ctx, nodeID, t.requestID, blkID)

// Tracks performance statistics
t.metrics.numRequests.Set(float64(t.blkReqs.Len()))
Expand All @@ -801,6 +844,7 @@ func (t *Transitive) sendQuery(
t.Ctx.Log.Error("dropped query for block",
zap.String("reason", "insufficient number of validators"),
zap.Stringer("blkID", blkID),
zap.Int("size", t.Params.K),
)
return
}
Expand All @@ -818,21 +862,21 @@ func (t *Transitive) sendQuery(
}

vdrBag := bag.Of(vdrIDs...)
t.RequestID++
if !t.polls.Add(t.RequestID, vdrBag) {
t.requestID++
if !t.polls.Add(t.requestID, vdrBag) {
t.Ctx.Log.Error("dropped query for block",
zap.String("reason", "failed to add poll"),
zap.Stringer("blkID", blkID),
zap.Uint32("requestID", t.RequestID),
zap.Uint32("requestID", t.requestID),
)
return
}

vdrSet := set.Of(vdrIDs...)
if push {
t.Sender.SendPushQuery(ctx, vdrSet, t.RequestID, blkBytes, nextHeightToAccept)
t.Sender.SendPushQuery(ctx, vdrSet, t.requestID, blkBytes, nextHeightToAccept)
} else {
t.Sender.SendPullQuery(ctx, vdrSet, t.RequestID, blkID, nextHeightToAccept)
t.Sender.SendPullQuery(ctx, vdrSet, t.requestID, blkID, nextHeightToAccept)
}
}

Expand Down
12 changes: 6 additions & 6 deletions snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) {
func TestEngineGossip(t *testing.T) {
require := require.New(t)

_, _, sender, vm, te, gBlk := setupDefaultConfig(t)
nodeID, _, sender, vm, te, gBlk := setupDefaultConfig(t)

vm.LastAcceptedF = func(context.Context) (ids.ID, error) {
return gBlk.ID(), nil
Expand All @@ -1304,15 +1304,15 @@ func TestEngineGossip(t *testing.T) {
return gBlk, nil
}

called := new(bool)
sender.SendGossipF = func(_ context.Context, blkBytes []byte) {
*called = true
require.Equal(gBlk.Bytes(), blkBytes)
var calledSendPullQuery bool
sender.SendPullQueryF = func(_ context.Context, nodeIDs set.Set[ids.NodeID], _ uint32, _ ids.ID, _ uint64) {
calledSendPullQuery = true
require.Equal(set.Of(nodeID), nodeIDs)
}

require.NoError(te.Gossip(context.Background()))

require.True(*called)
require.True(calledSendPullQuery)
}

func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions utils/constants/networking.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ const (
DefaultBenchlistMinFailingDuration = 2*time.Minute + 30*time.Second

// Router
DefaultAcceptedFrontierGossipFrequency = 10 * time.Second
DefaultConsensusAppConcurrency = 2
DefaultConsensusShutdownTimeout = time.Minute
DefaultFrontierGossipFrequency = 100 * time.Millisecond
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
DefaultConsensusGossipAcceptedFrontierValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierNonValidatorSize = 0
DefaultConsensusGossipAcceptedFrontierPeerSize = 15
DefaultConsensusGossipAcceptedFrontierPeerSize = 1
DefaultConsensusGossipOnAcceptValidatorSize = 0
DefaultConsensusGossipOnAcceptNonValidatorSize = 0
DefaultConsensusGossipOnAcceptPeerSize = 10
Expand Down