Skip to content

Commit

Permalink
Simplify get server creation (#2285)
Browse files Browse the repository at this point in the history
Co-authored-by: Dan Laine <daniel.laine@avalabs.org>
  • Loading branch information
StephenButtolph and danlaine committed Nov 13, 2023
1 parent baf0ef7 commit 7f70fcf
Show file tree
Hide file tree
Showing 14 changed files with 220 additions and 252 deletions.
132 changes: 72 additions & 60 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,23 +841,14 @@ func (m *manager) createAvalancheChain(
startupTracker := tracker.NewStartup(connectedBeacons, (3*bootstrapWeight+3)/4)
vdrs.RegisterCallbackListener(ctx.SubnetID, startupTracker)

snowmanCommonCfg := common.Config{
Ctx: ctx,
Beacons: vdrs,
SampleK: sampleK,
Alpha: bootstrapWeight/2 + 1, // must be > 50%
StartupTracker: startupTracker,
Sender: snowmanMessageSender,
BootstrapTracker: sb,
Timer: h,
RetryBootstrap: m.RetryBootstrap,
RetryBootstrapWarnFrequency: m.RetryBootstrapWarnFrequency,
MaxTimeGetAncestors: m.BootstrapMaxTimeGetAncestors,
AncestorsMaxContainersSent: m.BootstrapAncestorsMaxContainersSent,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
SharedCfg: &common.SharedConfig{},
}
snowGetHandler, err := snowgetter.New(vmWrappingProposerVM, snowmanCommonCfg)
snowGetHandler, err := snowgetter.New(
vmWrappingProposerVM,
snowmanMessageSender,
ctx.Log,
m.BootstrapMaxTimeGetAncestors,
m.BootstrapAncestorsMaxContainersSent,
ctx.Registerer,
)
if err != nil {
return nil, fmt.Errorf("couldn't initialize snow base message handler: %w", err)
}
Expand All @@ -870,10 +861,10 @@ func (m *manager) createAvalancheChain(
// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
snowmanEngineConfig := smeng.Config{
Ctx: snowmanCommonCfg.Ctx,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vmWrappingProposerVM,
Sender: snowmanCommonCfg.Sender,
Sender: snowmanMessageSender,
Validators: vdrs,
Params: consensusParams,
Consensus: snowmanConsensus,
Expand All @@ -889,7 +880,20 @@ func (m *manager) createAvalancheChain(

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
Config: snowmanCommonCfg,
Config: common.Config{
Ctx: ctx,
Beacons: vdrs,
SampleK: sampleK,
Alpha: bootstrapWeight/2 + 1, // must be > 50%
StartupTracker: startupTracker,
Sender: snowmanMessageSender,
BootstrapTracker: sb,
Timer: h,
RetryBootstrap: m.RetryBootstrap,
RetryBootstrapWarnFrequency: m.RetryBootstrapWarnFrequency,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
SharedCfg: &common.SharedConfig{},
},
AllGetsServer: snowGetHandler,
Blocked: blockBlocker,
VM: vmWrappingProposerVM,
Expand All @@ -906,24 +910,14 @@ func (m *manager) createAvalancheChain(
snowmanBootstrapper = common.TraceBootstrapableEngine(snowmanBootstrapper, m.Tracer)
}

avalancheCommonCfg := common.Config{
Ctx: ctx,
Beacons: vdrs,
SampleK: sampleK,
StartupTracker: startupTracker,
Alpha: bootstrapWeight/2 + 1, // must be > 50%
Sender: avalancheMessageSender,
BootstrapTracker: sb,
Timer: h,
RetryBootstrap: m.RetryBootstrap,
RetryBootstrapWarnFrequency: m.RetryBootstrapWarnFrequency,
MaxTimeGetAncestors: m.BootstrapMaxTimeGetAncestors,
AncestorsMaxContainersSent: m.BootstrapAncestorsMaxContainersSent,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
SharedCfg: &common.SharedConfig{},
}

avaGetHandler, err := avagetter.New(vtxManager, avalancheCommonCfg)
avaGetHandler, err := avagetter.New(
vtxManager,
avalancheMessageSender,
ctx.Log,
m.BootstrapMaxTimeGetAncestors,
m.BootstrapAncestorsMaxContainersSent,
ctx.AvalancheRegisterer,
)
if err != nil {
return nil, fmt.Errorf("couldn't initialize avalanche base message handler: %w", err)
}
Expand All @@ -938,7 +932,20 @@ func (m *manager) createAvalancheChain(
_, specifiedLinearizationTime := version.CortinaTimes[ctx.NetworkID]
specifiedLinearizationTime = specifiedLinearizationTime && ctx.ChainID == m.XChainID
avalancheBootstrapperConfig := avbootstrap.Config{
Config: avalancheCommonCfg,
Config: common.Config{
Ctx: ctx,
Beacons: vdrs,
SampleK: sampleK,
StartupTracker: startupTracker,
Alpha: bootstrapWeight/2 + 1, // must be > 50%
Sender: avalancheMessageSender,
BootstrapTracker: sb,
Timer: h,
RetryBootstrap: m.RetryBootstrap,
RetryBootstrapWarnFrequency: m.RetryBootstrapWarnFrequency,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
SharedCfg: &common.SharedConfig{},
},
AllGetsServer: avaGetHandler,
VtxBlocked: vtxBlocker,
TxBlocked: txBlocker,
Expand Down Expand Up @@ -1190,24 +1197,14 @@ func (m *manager) createSnowmanChain(
startupTracker := tracker.NewStartup(connectedBeacons, (3*bootstrapWeight+3)/4)
beacons.RegisterCallbackListener(ctx.SubnetID, startupTracker)

commonCfg := common.Config{
Ctx: ctx,
Beacons: beacons,
SampleK: sampleK,
StartupTracker: startupTracker,
Alpha: bootstrapWeight/2 + 1, // must be > 50%
Sender: messageSender,
BootstrapTracker: sb,
Timer: h,
RetryBootstrap: m.RetryBootstrap,
RetryBootstrapWarnFrequency: m.RetryBootstrapWarnFrequency,
MaxTimeGetAncestors: m.BootstrapMaxTimeGetAncestors,
AncestorsMaxContainersSent: m.BootstrapAncestorsMaxContainersSent,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
SharedCfg: &common.SharedConfig{},
}

snowGetHandler, err := snowgetter.New(vm, commonCfg)
snowGetHandler, err := snowgetter.New(
vm,
messageSender,
ctx.Log,
m.BootstrapMaxTimeGetAncestors,
m.BootstrapAncestorsMaxContainersSent,
ctx.Registerer,
)
if err != nil {
return nil, fmt.Errorf("couldn't initialize snow base message handler: %w", err)
}
Expand All @@ -1220,14 +1217,14 @@ func (m *manager) createSnowmanChain(
// Create engine, bootstrapper and state-syncer in this order,
// to make sure start callbacks are duly initialized
engineConfig := smeng.Config{
Ctx: commonCfg.Ctx,
Ctx: ctx,
AllGetsServer: snowGetHandler,
VM: vm,
Sender: commonCfg.Sender,
Sender: messageSender,
Validators: vdrs,
Params: consensusParams,
Consensus: consensus,
PartialSync: m.PartialSyncPrimaryNetwork && commonCfg.Ctx.ChainID == constants.PlatformChainID,
PartialSync: m.PartialSyncPrimaryNetwork && ctx.ChainID == constants.PlatformChainID,
}
engine, err := smeng.New(engineConfig)
if err != nil {
Expand All @@ -1238,6 +1235,21 @@ func (m *manager) createSnowmanChain(
engine = smeng.TraceEngine(engine, m.Tracer)
}

commonCfg := common.Config{
Ctx: ctx,
Beacons: beacons,
SampleK: sampleK,
StartupTracker: startupTracker,
Alpha: bootstrapWeight/2 + 1, // must be > 50%
Sender: messageSender,
BootstrapTracker: sb,
Timer: h,
RetryBootstrap: m.RetryBootstrap,
RetryBootstrapWarnFrequency: m.RetryBootstrapWarnFrequency,
AncestorsMaxContainersReceived: m.BootstrapAncestorsMaxContainersReceived,
SharedCfg: &common.SharedConfig{},
}

// create bootstrap gear
bootstrapCfg := smbootstrap.Config{
Config: commonCfg,
Expand Down
4 changes: 2 additions & 2 deletions snow/engine/avalanche/bootstrap/bootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -102,12 +103,11 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *common.SenderTest, *vertex.Te
Sender: sender,
BootstrapTracker: bootstrapTracker,
Timer: &common.TimerTest{},
AncestorsMaxContainersSent: 2000,
AncestorsMaxContainersReceived: 2000,
SharedCfg: &common.SharedConfig{},
}

avaGetHandler, err := getter.New(manager, commonConfig)
avaGetHandler, err := getter.New(manager, sender, ctx.Log, time.Second, 2000, ctx.AvalancheRegisterer)
require.NoError(err)

return Config{
Expand Down
41 changes: 26 additions & 15 deletions snow/engine/avalanche/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"

"go.uber.org/zap"

"github.com/ava-labs/avalanchego/ids"
Expand All @@ -25,30 +27,39 @@ import (
// Get requests are always served, regardless node state (bootstrapping or normal operations).
var _ common.AllGetsServer = (*getter)(nil)

func New(storage vertex.Storage, commonCfg common.Config) (common.AllGetsServer, error) {
func New(
storage vertex.Storage,
sender common.Sender,
log logging.Logger,
maxTimeGetAncestors time.Duration,
maxContainersGetAncestors int,
reg prometheus.Registerer,
) (common.AllGetsServer, error) {
gh := &getter{
storage: storage,
sender: commonCfg.Sender,
cfg: commonCfg,
log: commonCfg.Ctx.Log,
storage: storage,
sender: sender,
log: log,
maxTimeGetAncestors: maxTimeGetAncestors,
maxContainersGetAncestors: maxContainersGetAncestors,
}

var err error
gh.getAncestorsVtxs, err = metric.NewAverager(
"bs",
"get_ancestors_vtxs",
"vertices fetched in a call to GetAncestors",
commonCfg.Ctx.AvalancheRegisterer,
reg,
)
return gh, err
}

type getter struct {
storage vertex.Storage
sender common.Sender
cfg common.Config
storage vertex.Storage
sender common.Sender
log logging.Logger
maxTimeGetAncestors time.Duration
maxContainersGetAncestors int

log logging.Logger
getAncestorsVtxs metric.Averager
}

Expand Down Expand Up @@ -106,13 +117,13 @@ func (gh *getter) GetAncestors(ctx context.Context, nodeID ids.NodeID, requestID
return nil // Don't have the requested vertex. Drop message.
}

queue := make([]avalanche.Vertex, 1, gh.cfg.AncestorsMaxContainersSent) // for BFS
queue := make([]avalanche.Vertex, 1, gh.maxContainersGetAncestors) // for BFS
queue[0] = vertex
ancestorsBytesLen := 0 // length, in bytes, of vertex and its ancestors
ancestorsBytes := make([][]byte, 0, gh.cfg.AncestorsMaxContainersSent) // vertex and its ancestors in BFS order
visited := set.Of(vertex.ID()) // IDs of vertices that have been in queue before
ancestorsBytesLen := 0 // length, in bytes, of vertex and its ancestors
ancestorsBytes := make([][]byte, 0, gh.maxContainersGetAncestors) // vertex and its ancestors in BFS order
visited := set.Of(vertex.ID()) // IDs of vertices that have been in queue before

for len(ancestorsBytes) < gh.cfg.AncestorsMaxContainersSent && len(queue) > 0 && time.Since(startTime) < gh.cfg.MaxTimeGetAncestors {
for len(ancestorsBytes) < gh.maxContainersGetAncestors && len(queue) > 0 && time.Since(startTime) < gh.maxTimeGetAncestors {
var vtx avalanche.Vertex
vtx, queue = queue[0], queue[1:] // pop
vtxBytes := vtx.Bytes()
Expand Down

0 comments on commit 7f70fcf

Please sign in to comment.