From 3544aa459c11ba85527878081852abdf7b0257aa Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Fri, 28 Aug 2020 16:16:51 -0400 Subject: [PATCH 1/4] Add command line parameter to set max non staker msgs cap --- chains/manager.go | 51 ++++++++------ main/params.go | 1 + node/config.go | 19 ++--- node/node.go | 1 + snow/networking/router/chain_router_test.go | 2 + snow/networking/router/handler.go | 5 +- snow/networking/router/handler_test.go | 3 + snow/networking/router/service_queue.go | 3 +- snow/networking/router/service_queue_test.go | 3 + snow/networking/sender/sender_test.go | 3 + snow/networking/throttler/ewma.go | 39 +++++++---- snow/networking/throttler/throttler_test.go | 74 ++++++++++++++------ vms/platformvm/vm_test.go | 1 + vms/spchainvm/consensus_benchmark_test.go | 2 + 14 files changed, 136 insertions(+), 71 deletions(-) diff --git a/chains/manager.go b/chains/manager.go index 2b48fe1c85b5..e799c3271dba 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -115,6 +115,7 @@ type manager struct { stakingEnabled bool // True iff the network has staking enabled stakerMsgPortion, stakerCPUPortion float64 + maxNonStakerPendingMsgs uint32 log logging.Logger logFactory logging.Factory vmManager vms.Manager // Manage mappings from vm ID --> vm @@ -152,6 +153,7 @@ type manager struct { // TODO: Make this function take less arguments func New( stakingEnabled bool, + maxNonStakerPendingMsgs uint, stakerMsgPortion, stakerCPUPortion float64, log logging.Logger, @@ -186,29 +188,30 @@ func New( rtr.Initialize(log, &timeoutManager, gossipFrequency, shutdownTimeout) m := &manager{ - stakingEnabled: stakingEnabled, - stakerMsgPortion: stakerMsgPortion, - stakerCPUPortion: stakerCPUPortion, - log: log, - logFactory: logFactory, - vmManager: vmManager, - decisionEvents: decisionEvents, - consensusEvents: consensusEvents, - db: db, - chainRouter: rtr, - net: net, - timeoutManager: &timeoutManager, - consensusParams: consensusParams, - validators: validators, - nodeID: nodeID, - networkID: networkID, - server: server, - keystore: keystore, - atomicMemory: atomicMemory, - avaxAssetID: avaxAssetID, - xChainID: xChainID, - criticalChains: criticalChains, - chains: make(map[[32]byte]*router.Handler), + stakingEnabled: stakingEnabled, + maxNonStakerPendingMsgs: uint32(maxNonStakerPendingMsgs), + stakerMsgPortion: stakerMsgPortion, + stakerCPUPortion: stakerCPUPortion, + log: log, + logFactory: logFactory, + vmManager: vmManager, + decisionEvents: decisionEvents, + consensusEvents: consensusEvents, + db: db, + chainRouter: rtr, + net: net, + timeoutManager: &timeoutManager, + consensusParams: consensusParams, + validators: validators, + nodeID: nodeID, + networkID: networkID, + server: server, + keystore: keystore, + atomicMemory: atomicMemory, + avaxAssetID: avaxAssetID, + xChainID: xChainID, + criticalChains: criticalChains, + chains: make(map[[32]byte]*router.Handler), } m.Initialize() return m, nil @@ -510,6 +513,7 @@ func (m *manager) createAvalancheChain( validators, msgChan, defaultChannelSize, + m.maxNonStakerPendingMsgs, m.stakerMsgPortion, m.stakerCPUPortion, fmt.Sprintf("%s_handler", consensusParams.Namespace), @@ -588,6 +592,7 @@ func (m *manager) createSnowmanChain( validators, msgChan, defaultChannelSize, + m.maxNonStakerPendingMsgs, m.stakerMsgPortion, m.stakerCPUPortion, fmt.Sprintf("%s_handler", consensusParams.Namespace), diff --git a/main/params.go b/main/params.go index de74f278fad9..03997835749e 100644 --- a/main/params.go +++ b/main/params.go @@ -202,6 +202,7 @@ func init() { fs.Uint64Var(&Config.DisabledStakingWeight, "staking-disabled-weight", 1, "Weight to provide to each peer when staking is disabled") // Throttling: + fs.UintVar(&Config.MaxNonStakerPendingMsgs, "max-non-staker-pending", 3, "Maximum number of messages a non-staker is allowed to have pending.") fs.Float64Var(&Config.StakerMsgPortion, "staker-msg-reserved", 0.2, "Reserve a portion of the chain message queue's space for stakers.") fs.Float64Var(&Config.StakerCPUPortion, "staker-cpu-reserved", 0.2, "Reserve a portion of the chain's CPU time for stakers.") diff --git a/node/config.go b/node/config.go index aed3c9cec727..5e6ad9f8dcec 100644 --- a/node/config.go +++ b/node/config.go @@ -36,15 +36,16 @@ type Config struct { DB database.Database // Staking configuration - StakingIP utils.IPDesc - StakingLocalPort uint16 - EnableP2PTLS bool - EnableStaking bool - StakingKeyFile string - StakingCertFile string - DisabledStakingWeight uint64 - StakerMsgPortion float64 - StakerCPUPortion float64 + StakingIP utils.IPDesc + StakingLocalPort uint16 + EnableP2PTLS bool + EnableStaking bool + StakingKeyFile string + StakingCertFile string + DisabledStakingWeight uint64 + MaxNonStakerPendingMsgs uint + StakerMsgPortion float64 + StakerCPUPortion float64 // Bootstrapping configuration BootstrapPeers []*Peer diff --git a/node/node.go b/node/node.go index a073358548d4..ae8ed261641d 100644 --- a/node/node.go +++ b/node/node.go @@ -411,6 +411,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { n.chainManager, err = chains.New( n.Config.EnableStaking, + n.Config.MaxNonStakerPendingMsgs, n.Config.StakerMsgPortion, n.Config.StakerCPUPortion, n.Log, diff --git a/snow/networking/router/chain_router_test.go b/snow/networking/router/chain_router_test.go index 3ab075a7b53b..8fb89e9cb322 100644 --- a/snow/networking/router/chain_router_test.go +++ b/snow/networking/router/chain_router_test.go @@ -39,6 +39,7 @@ func TestShutdown(t *testing.T) { validators.NewSet(), nil, 1, + DefaultMaxNonStakerPendingMsgs, DefaultStakerPortion, DefaultStakerPortion, "", @@ -97,6 +98,7 @@ func TestShutdownTimesOut(t *testing.T) { validators.NewSet(), nil, 1, + DefaultMaxNonStakerPendingMsgs, DefaultStakerPortion, DefaultStakerPortion, "", diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index f52f8cbc0b82..5f58e4e49edb 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -18,7 +18,8 @@ import ( ) const ( - DefaultStakerPortion float64 = 0.2 + DefaultStakerPortion float64 = 0.2 + DefaultMaxNonStakerPendingMsgs uint32 = 3 ) // Requirement: A set of nodes spamming messages (potentially costly) shouldn't @@ -117,6 +118,7 @@ func (h *Handler) Initialize( validators validators.Set, msgChan <-chan common.Message, bufferSize int, + maxNonStakerPendingMsgs uint32, stakerMsgPortion, stakerCPUPortion float64, namespace string, @@ -156,6 +158,7 @@ func (h *Handler) Initialize( consumptionRanges, consumptionAllotments, bufferSize, + maxNonStakerPendingMsgs, cpuInterval, stakerMsgPortion, stakerCPUPortion, diff --git a/snow/networking/router/handler_test.go b/snow/networking/router/handler_test.go index e7fdb1a3357c..14aee95ad413 100644 --- a/snow/networking/router/handler_test.go +++ b/snow/networking/router/handler_test.go @@ -41,6 +41,7 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) { vdrs, nil, 16, + DefaultMaxNonStakerPendingMsgs, DefaultStakerPortion, DefaultStakerPortion, "", @@ -83,6 +84,7 @@ func TestHandlerDoesntDrop(t *testing.T) { validators, nil, 16, + DefaultMaxNonStakerPendingMsgs, DefaultStakerPortion, DefaultStakerPortion, "", @@ -118,6 +120,7 @@ func TestHandlerClosesOnError(t *testing.T) { validators.NewSet(), nil, 16, + DefaultMaxNonStakerPendingMsgs, DefaultStakerPortion, DefaultStakerPortion, "", diff --git a/snow/networking/router/service_queue.go b/snow/networking/router/service_queue.go index e054129465b6..5fe5b47a6182 100644 --- a/snow/networking/router/service_queue.go +++ b/snow/networking/router/service_queue.go @@ -60,13 +60,14 @@ func newMultiLevelQueue( consumptionRanges []float64, consumptionAllotments []time.Duration, bufferSize int, + maxNonStakerPendingMsgs uint32, cpuInterval time.Duration, msgPortion, cpuPortion float64, ) (messageQueue, chan struct{}) { semaChan := make(chan struct{}, bufferSize) singleLevelSize := bufferSize / len(consumptionRanges) - throttler := throttler.NewEWMAThrottler(vdrs, uint32(bufferSize), msgPortion, cpuPortion, cpuInterval, log) + throttler := throttler.NewEWMAThrottler(vdrs, uint32(bufferSize), maxNonStakerPendingMsgs, msgPortion, cpuPortion, cpuInterval, log) queues := make([]singleLevelQueue, len(consumptionRanges)) for index := 0; index < len(queues); index++ { gauge, histogram, err := metrics.registerTierStatistics(index) diff --git a/snow/networking/router/service_queue_test.go b/snow/networking/router/service_queue_test.go index b79098864695..8ef87f0050c7 100644 --- a/snow/networking/router/service_queue_test.go +++ b/snow/networking/router/service_queue_test.go @@ -42,6 +42,7 @@ func setupMultiLevelQueue(t *testing.T, bufferSize int) (messageQueue, chan stru consumptionRanges, consumptionAllotments, bufferSize, + DefaultMaxNonStakerPendingMsgs, time.Second, DefaultStakerPortion, DefaultStakerPortion, @@ -169,6 +170,7 @@ func TestMultiLevelQueuePrioritizes(t *testing.T) { consumptionRanges, consumptionAllotments, bufferSize, + DefaultMaxNonStakerPendingMsgs, time.Second, DefaultStakerPortion, DefaultStakerPortion, @@ -263,6 +265,7 @@ func TestMultiLevelQueuePushesDownOldMessages(t *testing.T) { consumptionRanges, consumptionAllotments, bufferSize, + DefaultMaxNonStakerPendingMsgs, time.Second, DefaultStakerPortion, DefaultStakerPortion, diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 9b296b8f3f2e..08f6b509d226 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -67,6 +67,7 @@ func TestTimeout(t *testing.T) { validators.NewSet(), nil, 1, + router.DefaultMaxNonStakerPendingMsgs, router.DefaultStakerPortion, router.DefaultStakerPortion, "", @@ -123,6 +124,7 @@ func TestReliableMessages(t *testing.T) { validators.NewSet(), nil, 1, + router.DefaultMaxNonStakerPendingMsgs, router.DefaultStakerPortion, router.DefaultStakerPortion, "", @@ -189,6 +191,7 @@ func TestReliableMessagesToMyself(t *testing.T) { validators.NewSet(), nil, 1, + router.DefaultMaxNonStakerPendingMsgs, router.DefaultStakerPortion, router.DefaultStakerPortion, "", diff --git a/snow/networking/throttler/ewma.go b/snow/networking/throttler/ewma.go index a8068c553cf0..0ea9098c15cd 100644 --- a/snow/networking/throttler/ewma.go +++ b/snow/networking/throttler/ewma.go @@ -30,13 +30,19 @@ type ewmaThrottler struct { vdrs validators.Set // Track CPU utilization - decayFactor float64 - stakerCPU, nonReservedCPU time.Duration + decayFactor float64 // Factor used to discount the EWMA at every period + stakerCPU time.Duration // Amount of CPU time reserved for stakers + nonReservedCPU time.Duration // Amount of CPU time that is not reserved for stakers // Track pending messages - reservedStakerMessages uint32 - pendingNonReservedMsgs, nonReservedMsgs uint32 - maxNonStakerPendingMsgs uint32 + reservedStakerMessages uint32 // Number of messages reserved for stakers + nonReservedMsgs uint32 // Number of non-reserved messages left to a shared message pool + pendingNonReservedMsgs uint32 // Number of pending messages taken from the shared message pool + + // Threshold of messages taken from the pool before the throttler begins to enforce hard caps on individual peers' pending messages + enforceIndividualCapThreshold uint32 + // Cap on number of pending messages allowed to a non-staker (not enforced until above [enforceIndividualCapThreshold] is exceeded) + maxNonStakerPendingMsgs uint32 // Statistics adjusted at every interval currentPeriod uint32 @@ -55,7 +61,8 @@ type ewmaThrottler struct { // which is not the limit since it tracks consumption using EWMA. func NewEWMAThrottler( vdrs validators.Set, - maxMessages uint32, + maxMessages, + maxNonStakerPendingMsgs uint32, stakerMsgPortion, stakerCPUPortion float64, period time.Duration, @@ -89,8 +96,10 @@ func NewEWMAThrottler( stakerCPU: stakerCPU, nonReservedCPU: nonReservedCPU, - reservedStakerMessages: reservedStakerMessages, - nonReservedMsgs: nonReservedMsgs, + reservedStakerMessages: reservedStakerMessages, + nonReservedMsgs: nonReservedMsgs, + enforceIndividualCapThreshold: nonReservedMsgs / 2, // If the pool is half empty, begin to enforce the max message caps + maxNonStakerPendingMsgs: maxNonStakerPendingMsgs, } // Add validators to spenders, so that they will be calculated correctly in @@ -161,7 +170,8 @@ func (et *ewmaThrottler) GetUtilization( sp := et.getSpender(validatorID) if !sp.staking { exceedsMessageAllotment := et.pendingNonReservedMsgs > et.nonReservedMsgs || // the shared message pool has been taken - sp.pendingMessages > sp.maxMessages // exceeds its own individual message cap + (sp.pendingMessages > sp.maxMessages && // Spender has exceeded its individual cap + et.pendingNonReservedMsgs > et.enforceIndividualCapThreshold) // And the threshold before enforcing the cap has been reached if exceedsMessageAllotment { et.log.Verbo("Throttling non-staker %s: %s. Pending pool messages: %d/%d.", @@ -176,9 +186,10 @@ func (et *ewmaThrottler) GetUtilization( // Staker should only be throttled if it has exceeded its message allotment // and there are either no messages left in the shared pool or it has // exceeded its own maximum message allocation. - exceedsMessageAllotment := sp.pendingMessages > sp.msgAllotment && // exceeds its own individual message allotment - (et.pendingNonReservedMsgs > et.nonReservedMsgs || // no unreserved messages - sp.pendingMessages > sp.maxMessages) // exceeds its own individual message cap + exceedsMessageAllotment := sp.pendingMessages > sp.msgAllotment && // Throttle if the staker has exceeded its allotment + (et.pendingNonReservedMsgs > et.nonReservedMsgs || // And either the shared message pool is empty + (et.pendingNonReservedMsgs > et.enforceIndividualCapThreshold && // Or the threshold before enforcing the cap has been reached + sp.pendingMessages > sp.maxMessages)) // and this staker has exceeded its individual cap if exceedsMessageAllotment { et.log.Debug("Throttling staker %s: %s. Pending pool messages: %d/%d.", @@ -198,8 +209,6 @@ func (et *ewmaThrottler) EndInterval() { et.cumulativeEWMA = time.Duration(float64(et.cumulativeEWMA) / et.decayFactor) stakingWeight := et.vdrs.Weight() - numPeers := et.vdrs.Len() + 1 - et.maxNonStakerPendingMsgs = et.nonReservedMsgs / uint32(numPeers) for key, spender := range et.spenders { spender.cpuEWMA = time.Duration(float64(spender.cpuEWMA) / et.decayFactor) @@ -209,7 +218,7 @@ func (et *ewmaThrottler) EndInterval() { // Calculate staker allotment here spender.staking = true spender.msgAllotment = uint32(float64(et.reservedStakerMessages) * stakerPortion) - spender.maxMessages = uint32(float64(et.reservedStakerMessages)*stakerPortion) + et.maxNonStakerPendingMsgs + spender.maxMessages = spender.msgAllotment + et.maxNonStakerPendingMsgs spender.expectedCPU = time.Duration(float64(et.stakerCPU)*stakerPortion) + defaultMinimumCPUAllotment continue } diff --git a/snow/networking/throttler/throttler_test.go b/snow/networking/throttler/throttler_test.go index 835385e87dc2..c7d3d2cc54ab 100644 --- a/snow/networking/throttler/throttler_test.go +++ b/snow/networking/throttler/throttler_test.go @@ -12,6 +12,10 @@ import ( "github.com/ava-labs/gecko/utils/logging" ) +const ( + defaultMaxNonStakerPendingMsgs uint32 = 3 +) + func TestEWMAThrottler(t *testing.T) { vdrs := validators.NewSet() validator0 := validators.GenerateRandomValidator(1) @@ -23,7 +27,7 @@ func TestEWMAThrottler(t *testing.T) { msgPortion := 0.25 cpuPortion := 0.25 period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, msgPortion, cpuPortion, period, logging.NoLog{}) + throttler := NewEWMAThrottler(vdrs, maxMessages, defaultMaxNonStakerPendingMsgs, msgPortion, cpuPortion, period, logging.NoLog{}) throttler.UtilizeCPU(validator0.ID(), 25*time.Millisecond) throttler.UtilizeCPU(validator1.ID(), 5*time.Second) @@ -68,7 +72,7 @@ func TestThrottlerPrunesSpenders(t *testing.T) { cpuPortion := 0.25 msgPortion := 0.25 period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, msgPortion, cpuPortion, period, logging.NoLog{}) + throttler := NewEWMAThrottler(vdrs, maxMessages, defaultMaxNonStakerPendingMsgs, msgPortion, cpuPortion, period, logging.NoLog{}) throttler.AddMessage(nonStaker2) // nonStaker2 should not be removed with a pending message throttler.UtilizeCPU(nonStaker0, 1.0) @@ -107,40 +111,66 @@ func TestThrottleStaker(t *testing.T) { staker0 := validators.GenerateRandomValidator(1) staker1 := validators.GenerateRandomValidator(1) nonStaker0 := ids.NewShortID([20]byte{1}) + nonStaker1 := ids.NewShortID([20]byte{2}) vdrs.Add(staker0) vdrs.Add(staker1) - maxMessages := uint32(16) + maxMessages := uint32(9) msgPortion := 0.25 cpuPortion := 0.25 period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, msgPortion, cpuPortion, period, logging.NoLog{}) + throttler := NewEWMAThrottler(vdrs, maxMessages, defaultMaxNonStakerPendingMsgs, msgPortion, cpuPortion, period, logging.NoLog{}) - // Message Allotment: 0.5 * 0.25 * 15 = 2 - // Message Pool: 12 messages - // Validator should be throttled iff it has exceeded its message allotment and the shared - // message pool is empty + // Message Allotment: 0.5 * 0.25 * 8 = 1 + // Message Pool: 6 messages + // Max Messages: 1 + defaultMaxNonStakerPendingMsgs + // Validator should be throttled if it has exceeded its max messages + // or it has exceeded its message allotment and the shared message pool is empty. - // staker0 consumes its own allotment plus 10 messages from the shared pool - for i := 0; i < 12; i++ { - throttler.AddMessage(staker0.ID()) - } + // staker0 consumes its entire message allotment - for i := 0; i < 3; i++ { - throttler.AddMessage(staker1.ID()) - if _, throttle := throttler.GetUtilization(staker1.ID()); throttle { + // Ensure that it is allowed to consume its entire max messages before being throttled + for i := 0; i < int(defaultMaxNonStakerPendingMsgs)+1; i++ { + throttler.AddMessage(staker0.ID()) + if _, throttle := throttler.GetUtilization(staker0.ID()); throttle { t.Fatal("Should not throttle message from staker until it has exceeded its own allotment") } } - // Consume the last message and one extra message from the shared pool - throttler.AddMessage(nonStaker0) - throttler.AddMessage(nonStaker0) - throttler.AddMessage(nonStaker0) + throttler.AddMessage(staker0.ID()) + if _, throttle := throttler.GetUtilization(staker0.ID()); !throttle { + t.Fatal("Should have throttled message after exceeding message") + } + + // Remove messages to reduce staker0 to have its normal message allotment pending + for i := 0; i < int(defaultMaxNonStakerPendingMsgs); i++ { + throttler.RemoveMessage(staker0.ID()) + } + + // Consume the entire message pool among two non-stakers + for i := 0; i < int(defaultMaxNonStakerPendingMsgs); i++ { + throttler.AddMessage(nonStaker0) + throttler.AddMessage(nonStaker1) + + // Neither should be throttled because they are only consuming until their own messsage cap + // and the shared pool has been emptied. + if _, throttle := throttler.GetUtilization(nonStaker0); throttle { + t.Fatalf("Should not have throttled message from nonStaker0 after %d messages", i) + } + if _, throttle := throttler.GetUtilization(nonStaker1); throttle { + t.Fatalf("Should not have throttled message from nonStaker1 after %d messages", i) + } + } - if _, throttle := throttler.GetUtilization(staker1.ID()); !throttle { - t.Fatal("Should have throttled message from staker after it exceeded its own allotment and the shared pool was empty") + // An additional message from staker0 should now cause it to be throttled since the mesasage pool + // has been emptied. + if _, throttle := throttler.GetUtilization(staker0.ID()); throttle { + t.Fatal("Should not have throttled message from staker until it had exceeded its message allotment.") + } + throttler.AddMessage(staker0.ID()) + if _, throttle := throttler.GetUtilization(staker0.ID()); !throttle { + t.Fatal("Should have throttled message from staker0 after it exceeded its message allotment because the message pool was empty.") } } @@ -155,7 +185,7 @@ func TestCalculatesEWMA(t *testing.T) { msgPortion := 0.25 stakerPortion := 0.25 period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, msgPortion, stakerPortion, period, logging.NoLog{}) + throttler := NewEWMAThrottler(vdrs, maxMessages, defaultMaxNonStakerPendingMsgs, msgPortion, stakerPortion, period, logging.NoLog{}) // Spend X CPU time in consecutive intervals and ensure that the throttler correctly calculates EWMA spends := []time.Duration{ diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 025b84d39393..6e8cfd87a968 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -1641,6 +1641,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { vdrs, msgChan, 1000, + router.DefaultMaxNonStakerPendingMsgs, router.DefaultStakerPortion, router.DefaultStakerPortion, "", diff --git a/vms/spchainvm/consensus_benchmark_test.go b/vms/spchainvm/consensus_benchmark_test.go index 019ffe0ab3eb..0f49b89ca4ac 100644 --- a/vms/spchainvm/consensus_benchmark_test.go +++ b/vms/spchainvm/consensus_benchmark_test.go @@ -111,6 +111,7 @@ func ConsensusLeader(numBlocks, numTxsPerBlock int, b *testing.B) { vdrs, msgChan, 1000, + router.DefaultMaxNonStakerPendingMsgs, router.DefaultStakerPortion, router.DefaultStakerPortion, "", @@ -253,6 +254,7 @@ func ConsensusFollower(numBlocks, numTxsPerBlock int, b *testing.B) { vdrs, msgChan, 1000, + router.DefaultMaxNonStakerPendingMsgs, router.DefaultStakerPortion, router.DefaultStakerPortion, "", From 8a1cd53d428d4ba050391081eb190154656f8265 Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Sat, 29 Aug 2020 13:01:40 -0400 Subject: [PATCH 2/4] Separate message throttling from CPU tracking --- snow/networking/router/chain_router_test.go | 13 +- snow/networking/router/handler.go | 5 - snow/networking/router/handler_test.go | 19 +- snow/networking/router/service_queue.go | 26 ++- snow/networking/router/service_queue_test.go | 19 +- snow/networking/sender/sender_test.go | 19 +- snow/networking/throttler/ewma.go | 211 ++++------------- .../networking/throttler/message_throttler.go | 221 ++++++++++++++++++ snow/networking/throttler/no.go | 26 ++- snow/networking/throttler/throttler.go | 21 ++ snow/networking/throttler/throttler_test.go | 179 ++++++++------ vms/platformvm/vm_test.go | 7 +- vms/spchainvm/consensus_benchmark_test.go | 14 +- 13 files changed, 482 insertions(+), 298 deletions(-) create mode 100644 snow/networking/throttler/message_throttler.go diff --git a/snow/networking/router/chain_router_test.go b/snow/networking/router/chain_router_test.go index 8fb89e9cb322..ede7004d4b41 100644 --- a/snow/networking/router/chain_router_test.go +++ b/snow/networking/router/chain_router_test.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging" @@ -39,9 +40,9 @@ func TestShutdown(t *testing.T) { validators.NewSet(), nil, 1, - DefaultMaxNonStakerPendingMsgs, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -98,9 +99,9 @@ func TestShutdownTimesOut(t *testing.T) { validators.NewSet(), nil, 1, - DefaultMaxNonStakerPendingMsgs, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index 5f58e4e49edb..4aed84cf8c45 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -17,11 +17,6 @@ import ( "github.com/ava-labs/gecko/utils/timer" ) -const ( - DefaultStakerPortion float64 = 0.2 - DefaultMaxNonStakerPendingMsgs uint32 = 3 -) - // Requirement: A set of nodes spamming messages (potentially costly) shouldn't // impact other node's queries. diff --git a/snow/networking/router/handler_test.go b/snow/networking/router/handler_test.go index 14aee95ad413..64657c354472 100644 --- a/snow/networking/router/handler_test.go +++ b/snow/networking/router/handler_test.go @@ -14,6 +14,7 @@ import ( "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" + "github.com/ava-labs/gecko/snow/networking/throttler" ) func TestHandlerDropsTimedOutMessages(t *testing.T) { @@ -41,9 +42,9 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) { vdrs, nil, 16, - DefaultMaxNonStakerPendingMsgs, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -84,9 +85,9 @@ func TestHandlerDoesntDrop(t *testing.T) { validators, nil, 16, - DefaultMaxNonStakerPendingMsgs, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -120,9 +121,9 @@ func TestHandlerClosesOnError(t *testing.T) { validators.NewSet(), nil, 16, - DefaultMaxNonStakerPendingMsgs, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) diff --git a/snow/networking/router/service_queue.go b/snow/networking/router/service_queue.go index 5fe5b47a6182..708f87c23d5f 100644 --- a/snow/networking/router/service_queue.go +++ b/snow/networking/router/service_queue.go @@ -31,8 +31,9 @@ type messageQueue interface { type multiLevelQueue struct { lock sync.Mutex - validators validators.Set - throttler throttler.Throttler + validators validators.Set + cpuTracker throttler.CPUTracker + msgThrottler throttler.CountingThrottler // Tracks total CPU consumption intervalConsumption, tierConsumption, cpuInterval time.Duration @@ -67,7 +68,8 @@ func newMultiLevelQueue( ) (messageQueue, chan struct{}) { semaChan := make(chan struct{}, bufferSize) singleLevelSize := bufferSize / len(consumptionRanges) - throttler := throttler.NewEWMAThrottler(vdrs, uint32(bufferSize), maxNonStakerPendingMsgs, msgPortion, cpuPortion, cpuInterval, log) + cpuTracker := throttler.NewEWMATracker(vdrs, cpuPortion, cpuInterval, log) + msgThrottler := throttler.NewMessageThrottler(vdrs, uint32(bufferSize), maxNonStakerPendingMsgs, msgPortion, log) queues := make([]singleLevelQueue, len(consumptionRanges)) for index := 0; index < len(queues); index++ { gauge, histogram, err := metrics.registerTierStatistics(index) @@ -85,7 +87,8 @@ func newMultiLevelQueue( return &multiLevelQueue{ validators: vdrs, - throttler: throttler, + cpuTracker: cpuTracker, + msgThrottler: msgThrottler, queues: queues, cpuRanges: consumptionRanges, cpuAllotments: consumptionAllotments, @@ -117,7 +120,7 @@ func (ml *multiLevelQueue) PushMessage(msg message) bool { return false } ml.pendingMessages++ - ml.throttler.AddMessage(msg.validatorID) + ml.msgThrottler.Add(msg.validatorID) select { case ml.semaChan <- struct{}{}: default: @@ -135,7 +138,7 @@ func (ml *multiLevelQueue) PopMessage() (message, error) { msg, err := ml.popMessage() if err == nil { ml.pendingMessages-- - ml.throttler.RemoveMessage(msg.validatorID) + ml.msgThrottler.Remove(msg.validatorID) ml.metrics.pending.Dec() } return msg, err @@ -146,7 +149,7 @@ func (ml *multiLevelQueue) UtilizeCPU(vdr ids.ShortID, duration time.Duration) { ml.lock.Lock() defer ml.lock.Unlock() - ml.throttler.UtilizeCPU(vdr, duration) + ml.cpuTracker.UtilizeCPU(vdr, duration) ml.intervalConsumption += duration ml.tierConsumption += duration if ml.tierConsumption > ml.cpuAllotments[ml.currentTier] { @@ -161,7 +164,8 @@ func (ml *multiLevelQueue) EndInterval() { ml.lock.Lock() defer ml.lock.Unlock() - ml.throttler.EndInterval() + ml.cpuTracker.EndInterval() + ml.msgThrottler.EndInterval() ml.metrics.cpu.Observe(float64(ml.intervalConsumption.Milliseconds())) ml.intervalConsumption = 0 } @@ -190,7 +194,7 @@ func (ml *multiLevelQueue) popMessage() (message, error) { ml.queues[ml.currentTier].waitingTime.Observe(float64(time.Since(msg.received))) // Check where messages from this validator currently belong - cpu, _ := ml.throttler.GetUtilization(msg.validatorID) + cpu := ml.cpuTracker.GetUtilization(msg.validatorID) correctIndex := ml.getPriorityIndex(cpu) // If the message is at least the priority of the current tier @@ -228,12 +232,12 @@ func (ml *multiLevelQueue) pushMessage(msg message) bool { ml.log.Warn("Dropping message due to invalid validatorID") return false } - cpu, throttle := ml.throttler.GetUtilization(validatorID) + throttle := ml.msgThrottler.Throttle(validatorID) if throttle { ml.metrics.throttled.Inc() return false } - + cpu := ml.cpuTracker.GetUtilization(validatorID) queueIndex := ml.getPriorityIndex(cpu) return ml.waterfallMessage(msg, queueIndex) diff --git a/snow/networking/router/service_queue_test.go b/snow/networking/router/service_queue_test.go index 8ef87f0050c7..bad122e57579 100644 --- a/snow/networking/router/service_queue_test.go +++ b/snow/networking/router/service_queue_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging" ) @@ -42,10 +43,10 @@ func setupMultiLevelQueue(t *testing.T, bufferSize int) (messageQueue, chan stru consumptionRanges, consumptionAllotments, bufferSize, - DefaultMaxNonStakerPendingMsgs, + throttler.DefaultMaxNonStakerPendingMsgs, time.Second, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, ) return queue, semaChan, vdrs @@ -170,10 +171,10 @@ func TestMultiLevelQueuePrioritizes(t *testing.T) { consumptionRanges, consumptionAllotments, bufferSize, - DefaultMaxNonStakerPendingMsgs, + throttler.DefaultMaxNonStakerPendingMsgs, time.Second, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, ) // Utilize CPU such that the next message from validator2 will be placed on a lower @@ -265,10 +266,10 @@ func TestMultiLevelQueuePushesDownOldMessages(t *testing.T) { consumptionRanges, consumptionAllotments, bufferSize, - DefaultMaxNonStakerPendingMsgs, + throttler.DefaultMaxNonStakerPendingMsgs, time.Second, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, ) queue.PushMessage(message{ diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 08f6b509d226..26a2b91a37c7 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" "github.com/ava-labs/gecko/snow/networking/router" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging" @@ -67,9 +68,9 @@ func TestTimeout(t *testing.T) { validators.NewSet(), nil, 1, - router.DefaultMaxNonStakerPendingMsgs, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -124,9 +125,9 @@ func TestReliableMessages(t *testing.T) { validators.NewSet(), nil, 1, - router.DefaultMaxNonStakerPendingMsgs, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -191,9 +192,9 @@ func TestReliableMessagesToMyself(t *testing.T) { validators.NewSet(), nil, 1, - router.DefaultMaxNonStakerPendingMsgs, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) diff --git a/snow/networking/throttler/ewma.go b/snow/networking/throttler/ewma.go index 0ea9098c15cd..19d984973a8f 100644 --- a/snow/networking/throttler/ewma.go +++ b/snow/networking/throttler/ewma.go @@ -15,17 +15,16 @@ import ( ) const ( - defaultDecayFactor float64 = 2 - defaultIntervalsUntilPruning uint32 = 60 - defaultMinimumCPUAllotment = time.Nanosecond + defaultDecayFactor float64 = 2 + defaultMinimumCPUAllotment = time.Nanosecond ) -type ewmaThrottler struct { +type ewmaCPUTracker struct { lock sync.Mutex log logging.Logger // Track peers - spenders map[[20]byte]*spender + cpuSpenders map[[20]byte]*cpuSpender cumulativeEWMA time.Duration vdrs validators.Set @@ -33,41 +32,22 @@ type ewmaThrottler struct { decayFactor float64 // Factor used to discount the EWMA at every period stakerCPU time.Duration // Amount of CPU time reserved for stakers nonReservedCPU time.Duration // Amount of CPU time that is not reserved for stakers - - // Track pending messages - reservedStakerMessages uint32 // Number of messages reserved for stakers - nonReservedMsgs uint32 // Number of non-reserved messages left to a shared message pool - pendingNonReservedMsgs uint32 // Number of pending messages taken from the shared message pool - - // Threshold of messages taken from the pool before the throttler begins to enforce hard caps on individual peers' pending messages - enforceIndividualCapThreshold uint32 - // Cap on number of pending messages allowed to a non-staker (not enforced until above [enforceIndividualCapThreshold] is exceeded) - maxNonStakerPendingMsgs uint32 - - // Statistics adjusted at every interval - currentPeriod uint32 } -// NewEWMAThrottler returns a Throttler that uses exponentially weighted moving +// NewEWMATracker returns a CPUTracker that uses exponentially weighted moving // average to estimate CPU utilization. // -// [maxMessages] is the maximum number of messages allotted to this chain -// [stakerMsgPortion] is the portion of messages to reserve exclusively for stakers -// [stakerCPUPortion] is the portion of CPU utilization to reserve for stakers -// both staker portions should be in the range (0, 1] +// [stakerCPUPortion] is the portion of CPU utilization to reserve for stakers (range (0, 1]) // [period] is the interval of time to use for the calculation of EWMA // -// Note: ewmaThrottler uses the period as the total amount of time per interval, +// Note: ewmaCPUTracker uses the period as the total amount of time per interval, // which is not the limit since it tracks consumption using EWMA. -func NewEWMAThrottler( +func NewEWMATracker( vdrs validators.Set, - maxMessages, - maxNonStakerPendingMsgs uint32, - stakerMsgPortion, stakerCPUPortion float64, period time.Duration, log logging.Logger, -) Throttler { +) CPUTracker { // Amount of CPU time reserved for processing messages from stakers stakerCPU := time.Duration(float64(period) * stakerCPUPortion) if stakerCPU < defaultMinimumCPUAllotment { @@ -82,69 +62,30 @@ func NewEWMAThrottler( nonReservedCPU = defaultMinimumCPUAllotment } - // Number of messages reserved for Stakers vs. Non-Stakers - reservedStakerMessages := uint32(stakerMsgPortion * float64(maxMessages)) - nonReservedMsgs := maxMessages - reservedStakerMessages - - throttler := &ewmaThrottler{ - spenders: make(map[[20]byte]*spender), - vdrs: vdrs, - log: log, + throttler := &ewmaCPUTracker{ + cpuSpenders: make(map[[20]byte]*cpuSpender), + vdrs: vdrs, + log: log, decayFactor: defaultDecayFactor, stakerCPU: stakerCPU, nonReservedCPU: nonReservedCPU, - - reservedStakerMessages: reservedStakerMessages, - nonReservedMsgs: nonReservedMsgs, - enforceIndividualCapThreshold: nonReservedMsgs / 2, // If the pool is half empty, begin to enforce the max message caps - maxNonStakerPendingMsgs: maxNonStakerPendingMsgs, } - // Add validators to spenders, so that they will be calculated correctly in + // Add validators to cpuSpenders, so that they will be calculated correctly in // EndInterval for _, vdr := range vdrs.List() { - throttler.spenders[vdr.ID().Key()] = &spender{} + throttler.cpuSpenders[vdr.ID().Key()] = &cpuSpender{} } // Call EndInterval to calculate initial period statistics and initial - // spender values for validators + // cpuSpender values for validators throttler.EndInterval() return throttler } -func (et *ewmaThrottler) AddMessage(validatorID ids.ShortID) { - et.lock.Lock() - defer et.lock.Unlock() - - sp := et.getSpender(validatorID) - sp.pendingMessages++ - - // If the spender has exceeded its message allotment, then the additional - // message is taken from the pool - if sp.pendingMessages > sp.msgAllotment { - sp.pendingPoolMessages++ - et.pendingNonReservedMsgs++ - } -} - -func (et *ewmaThrottler) RemoveMessage(validatorID ids.ShortID) { - et.lock.Lock() - defer et.lock.Unlock() - - sp := et.getSpender(validatorID) - sp.pendingMessages-- - - // If the spender has pending messages taken from the pool, - // they are the first messages to be removed. - if sp.pendingPoolMessages > 0 { - sp.pendingPoolMessages-- - et.pendingNonReservedMsgs-- - } -} - -func (et *ewmaThrottler) UtilizeCPU( +func (et *ewmaCPUTracker) UtilizeCPU( validatorID ids.ShortID, consumption time.Duration, ) { @@ -153,140 +94,86 @@ func (et *ewmaThrottler) UtilizeCPU( sp := et.getSpender(validatorID) sp.cpuEWMA += consumption - sp.lastSpend = et.currentPeriod et.cumulativeEWMA += consumption } -// Returns CPU GetUtilization metric as percentage of expected utilization and -// boolean specifying whether or not the validator has exceeded its message -// allotment. -func (et *ewmaThrottler) GetUtilization( - validatorID ids.ShortID, -) (float64, bool) { +// GetUtilization returns a percentage of expected CPU utilization of the peer +// corresponding to [validatorID] +func (et *ewmaCPUTracker) GetUtilization(validatorID ids.ShortID) float64 { et.lock.Lock() defer et.lock.Unlock() sharedUtilization := float64(et.cumulativeEWMA) / float64(et.nonReservedCPU) sp := et.getSpender(validatorID) if !sp.staking { - exceedsMessageAllotment := et.pendingNonReservedMsgs > et.nonReservedMsgs || // the shared message pool has been taken - (sp.pendingMessages > sp.maxMessages && // Spender has exceeded its individual cap - et.pendingNonReservedMsgs > et.enforceIndividualCapThreshold) // And the threshold before enforcing the cap has been reached - - if exceedsMessageAllotment { - et.log.Verbo("Throttling non-staker %s: %s. Pending pool messages: %d/%d.", - validatorID, - sp, - et.pendingNonReservedMsgs, - et.nonReservedMsgs) - } - return sharedUtilization, exceedsMessageAllotment - } - - // Staker should only be throttled if it has exceeded its message allotment - // and there are either no messages left in the shared pool or it has - // exceeded its own maximum message allocation. - exceedsMessageAllotment := sp.pendingMessages > sp.msgAllotment && // Throttle if the staker has exceeded its allotment - (et.pendingNonReservedMsgs > et.nonReservedMsgs || // And either the shared message pool is empty - (et.pendingNonReservedMsgs > et.enforceIndividualCapThreshold && // Or the threshold before enforcing the cap has been reached - sp.pendingMessages > sp.maxMessages)) // and this staker has exceeded its individual cap - - if exceedsMessageAllotment { - et.log.Debug("Throttling staker %s: %s. Pending pool messages: %d/%d.", - validatorID, - sp, - et.pendingNonReservedMsgs, - et.nonReservedMsgs) + return sharedUtilization } - return math.Min(float64(sp.cpuEWMA)/float64(sp.expectedCPU), sharedUtilization), exceedsMessageAllotment + return math.Min(float64(sp.cpuEWMA)/float64(sp.expectedCPU), sharedUtilization) } -func (et *ewmaThrottler) EndInterval() { +// EndInterval registers the end of a given CPU interval by discounting +// all cpuSpenders' cpuEWMA and removing outstanding spenders that have sufficiently +// low cpuEWMA stats +func (et *ewmaCPUTracker) EndInterval() { et.lock.Lock() defer et.lock.Unlock() - et.currentPeriod++ - et.cumulativeEWMA = time.Duration(float64(et.cumulativeEWMA) / et.decayFactor) stakingWeight := et.vdrs.Weight() - for key, spender := range et.spenders { - spender.cpuEWMA = time.Duration(float64(spender.cpuEWMA) / et.decayFactor) + removed := 0 + for key, cpuSpender := range et.cpuSpenders { + cpuSpender.cpuEWMA = time.Duration(float64(cpuSpender.cpuEWMA) / et.decayFactor) if vdr, exists := et.vdrs.Get(ids.NewShortID(key)); exists { stakerPortion := float64(vdr.Weight()) / float64(stakingWeight) // Calculate staker allotment here - spender.staking = true - spender.msgAllotment = uint32(float64(et.reservedStakerMessages) * stakerPortion) - spender.maxMessages = spender.msgAllotment + et.maxNonStakerPendingMsgs - spender.expectedCPU = time.Duration(float64(et.stakerCPU)*stakerPortion) + defaultMinimumCPUAllotment + cpuSpender.staking = true + cpuSpender.expectedCPU = time.Duration(float64(et.stakerCPU)*stakerPortion) + defaultMinimumCPUAllotment continue } - if spender.lastSpend+defaultIntervalsUntilPruning < et.currentPeriod && spender.pendingMessages == 0 { - et.log.Debug("Removing validator from throttler after not hearing from it for %d periods", - et.currentPeriod-spender.lastSpend) - delete(et.spenders, key) + if cpuSpender.cpuEWMA == 0 { + removed++ + delete(et.cpuSpenders, key) } - // If the validator is not a staker and was not deleted, set its spender + // If the validator is not a staker and was not deleted, set its cpuSpender // attributes - spender.staking = false - spender.msgAllotment = 0 - spender.maxMessages = et.maxNonStakerPendingMsgs - spender.expectedCPU = defaultMinimumCPUAllotment + cpuSpender.staking = false + cpuSpender.expectedCPU = defaultMinimumCPUAllotment } + et.log.Debug("Removed %d validators from CPU Tracker.", removed) } -// getSpender returns the [spender] corresponding to [validatorID] -func (et *ewmaThrottler) getSpender(validatorID ids.ShortID) *spender { +// getSpender returns the [cpuSpender] corresponding to [validatorID] +func (et *ewmaCPUTracker) getSpender(validatorID ids.ShortID) *cpuSpender { validatorKey := validatorID.Key() - if sp, exists := et.spenders[validatorKey]; exists { + if sp, exists := et.cpuSpenders[validatorKey]; exists { return sp } - // If this validator did not exist in spenders, create it and return - sp := &spender{ - maxMessages: et.maxNonStakerPendingMsgs, + // If this validator did not exist in cpuSpenders, create it and return + sp := &cpuSpender{ expectedCPU: defaultMinimumCPUAllotment, } - et.spenders[validatorKey] = sp + et.cpuSpenders[validatorKey] = sp return sp } -type spender struct { - // Last period that this spender utilized the CPU - lastSpend uint32 - - // Number of pending messages this spender has taken from the pool - pendingPoolMessages uint32 - - // Number of messages this spender currently has pending - pendingMessages uint32 - - // Number of messages allocated to this spender as a staker - msgAllotment uint32 - - // Max number of messages this spender can use even if the shared pool is - // non-empty - maxMessages uint32 - - // EWMA of this spender's CPU utilization +type cpuSpender struct { + // EWMA of this cpuSpender's CPU utilization cpuEWMA time.Duration // The expected CPU utilization of this peer expectedCPU time.Duration - // Flag to indicate if this spender is a staker + // Flag to indicate if this cpuSpender is a staker staking bool } -func (sp *spender) String() string { - return fmt.Sprintf("Spender(Messages: (%d+%d)/(%d+%d), CPU: %s/%s)", - sp.pendingPoolMessages, - sp.pendingMessages-sp.pendingPoolMessages, - sp.msgAllotment, - sp.maxMessages-sp.msgAllotment, +func (sp *cpuSpender) String() string { + return fmt.Sprintf("CPUTracker(CPU: %s/%s)", sp.cpuEWMA, sp.expectedCPU, ) diff --git a/snow/networking/throttler/message_throttler.go b/snow/networking/throttler/message_throttler.go new file mode 100644 index 000000000000..7eb3b4cd626e --- /dev/null +++ b/snow/networking/throttler/message_throttler.go @@ -0,0 +1,221 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throttler + +import ( + "fmt" + "sync" + + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/snow/validators" + "github.com/ava-labs/gecko/utils/logging" +) + +const ( + defaultIntervalsUntilPruning uint32 = 60 +) + +type messageThrottler struct { + lock sync.Mutex + log logging.Logger + + // Track peers + msgSpenders map[[20]byte]*msgSpender + vdrs validators.Set + + // Track pending messages + reservedStakerMessages uint32 // Number of messages reserved for stakers + nonReservedMsgs uint32 // Number of non-reserved messages left to a shared message pool + pendingNonReservedMsgs uint32 // Number of pending messages taken from the shared message pool + + // Cap on number of pending messages allowed to a non-staker + maxNonStakerPendingMsgs uint32 + + // Statistics adjusted at every interval + currentPeriod uint32 +} + +// NewMessageThrottler returns a MessageThrottler that throttles peers +// when they have too many pending messages outstanding. +// +// [maxMessages] is the maximum number of messages allotted to this chain +// [stakerMsgPortion] is the portion of messages to reserve exclusively for stakers +// should be in the range (0, 1] +func NewMessageThrottler( + vdrs validators.Set, + maxMessages, + maxNonStakerPendingMsgs uint32, + stakerMsgPortion float64, + log logging.Logger, +) CountingThrottler { + // Number of messages reserved for Stakers vs. Non-Stakers + reservedStakerMessages := uint32(stakerMsgPortion * float64(maxMessages)) + nonReservedMsgs := maxMessages - reservedStakerMessages + + throttler := &messageThrottler{ + msgSpenders: make(map[[20]byte]*msgSpender), + vdrs: vdrs, + log: log, + + reservedStakerMessages: reservedStakerMessages, + nonReservedMsgs: nonReservedMsgs, + maxNonStakerPendingMsgs: maxNonStakerPendingMsgs, + } + + // Add validators to msgSpenders, so that they will be calculated correctly in + // EndInterval + for _, vdr := range vdrs.List() { + throttler.msgSpenders[vdr.ID().Key()] = &msgSpender{} + } + + // Call EndInterval to calculate initial period statistics and initial + // msgSpender values for validators + throttler.EndInterval() + return throttler +} + +func (et *messageThrottler) Add(validatorID ids.ShortID) { + et.lock.Lock() + defer et.lock.Unlock() + + sp := et.getSpender(validatorID) + sp.pendingMessages++ + sp.lastSpend = et.currentPeriod + + // If the msgSpender has exceeded its message allotment, then the additional + // message is taken from the pool + if sp.pendingMessages > sp.msgAllotment { + sp.pendingPoolMessages++ + et.pendingNonReservedMsgs++ + } +} + +func (et *messageThrottler) Remove(validatorID ids.ShortID) { + et.lock.Lock() + defer et.lock.Unlock() + + sp := et.getSpender(validatorID) + sp.pendingMessages-- + + // If the msgSpender has pending messages taken from the pool, + // they are the first messages to be removed. + if sp.pendingPoolMessages > 0 { + sp.pendingPoolMessages-- + et.pendingNonReservedMsgs-- + } +} + +// Throttle returns true if messages from [validatorID] should be throttled due +// to having too many pending messages +func (et *messageThrottler) Throttle( + validatorID ids.ShortID, +) bool { + et.lock.Lock() + defer et.lock.Unlock() + + sp := et.getSpender(validatorID) + if !sp.staking { + exceedsMessageAllotment := et.pendingNonReservedMsgs > et.nonReservedMsgs || // the shared message pool has been taken + (sp.pendingMessages > sp.maxMessages) // Spender has exceeded its individual cap + + if exceedsMessageAllotment { + et.log.Verbo("Throttling non-staker %s: %s. Pending pool messages: %d/%d.", + validatorID, + sp, + et.pendingNonReservedMsgs, + et.nonReservedMsgs) + } + return exceedsMessageAllotment + } + + exceedsMessageAllotment := sp.pendingMessages > sp.msgAllotment && // Throttle if the staker has exceeded its allotment + (et.pendingNonReservedMsgs > et.nonReservedMsgs || // And either the shared message pool is empty + sp.pendingMessages > sp.maxMessages) // or this staker has exceeded its individual cap + + if exceedsMessageAllotment { + et.log.Debug("Throttling staker %s: %s. Pending pool messages: %d/%d.", + validatorID, + sp, + et.pendingNonReservedMsgs, + et.nonReservedMsgs) + } + return exceedsMessageAllotment +} + +func (et *messageThrottler) EndInterval() { + et.lock.Lock() + defer et.lock.Unlock() + + et.currentPeriod++ + stakingWeight := et.vdrs.Weight() + + for key, msgSpender := range et.msgSpenders { + if vdr, exists := et.vdrs.Get(ids.NewShortID(key)); exists { + stakerPortion := float64(vdr.Weight()) / float64(stakingWeight) + + // Calculate staker allotment here + msgSpender.staking = true + msgSpender.msgAllotment = uint32(float64(et.reservedStakerMessages) * stakerPortion) + msgSpender.maxMessages = msgSpender.msgAllotment + et.maxNonStakerPendingMsgs + continue + } + + if msgSpender.lastSpend+defaultIntervalsUntilPruning < et.currentPeriod && msgSpender.pendingMessages == 0 { + et.log.Debug("Removing validator from throttler after not hearing from it for %d periods", + et.currentPeriod-msgSpender.lastSpend) + delete(et.msgSpenders, key) + } + + // If the validator is not a staker and was not deleted, set its msgSpender + // attributes + msgSpender.staking = false + msgSpender.msgAllotment = 0 + msgSpender.maxMessages = et.maxNonStakerPendingMsgs + } +} + +// getSpender returns the [msgSpender] corresponding to [validatorID] +func (et *messageThrottler) getSpender(validatorID ids.ShortID) *msgSpender { + validatorKey := validatorID.Key() + if sp, exists := et.msgSpenders[validatorKey]; exists { + return sp + } + + // If this validator did not exist in msgSpenders, create it and return + sp := &msgSpender{ + maxMessages: et.maxNonStakerPendingMsgs, + } + et.msgSpenders[validatorKey] = sp + return sp +} + +type msgSpender struct { + // Last period that this msgSpender utilized the CPU + lastSpend uint32 + + // Number of pending messages this msgSpender has taken from the pool + pendingPoolMessages uint32 + + // Number of messages this msgSpender currently has pending + pendingMessages uint32 + + // Number of messages allocated to this msgSpender as a staker + msgAllotment uint32 + + // Max number of messages this msgSpender can use even if the shared pool is + // non-empty + maxMessages uint32 + + // Flag to indicate if this msgSpender is a staker + staking bool +} + +func (sp *msgSpender) String() string { + return fmt.Sprintf("MsgSpender(Messages: (%d+%d)/(%d+%d))", + sp.pendingPoolMessages, + sp.pendingMessages-sp.pendingPoolMessages, + sp.msgAllotment, + sp.maxMessages-sp.msgAllotment, + ) +} diff --git a/snow/networking/throttler/no.go b/snow/networking/throttler/no.go index 02f6dc18ae79..f734336375b9 100644 --- a/snow/networking/throttler/no.go +++ b/snow/networking/throttler/no.go @@ -9,17 +9,27 @@ import ( "github.com/ava-labs/gecko/ids" ) -type noThrottler struct{} +type noCountThrottler struct{} -func (noThrottler) AddMessage(ids.ShortID) {} +func (noCountThrottler) Add(ids.ShortID) {} -func (noThrottler) RemoveMessage(ids.ShortID) {} +func (noCountThrottler) Remove(ids.ShortID) {} -func (noThrottler) UtilizeCPU(ids.ShortID, time.Duration) {} +func (noCountThrottler) Throttle(ids.ShortID) bool { return false } -func (noThrottler) GetUtilization(ids.ShortID) (float64, bool) { return 0, false } +func (noCountThrottler) EndInterval() {} -func (noThrottler) EndInterval() {} +// NewNoCountThrottler returns a CountingThrottler that will never throttle +func NewNoCountThrottler() CountingThrottler { return noCountThrottler{} } -// NewNoThrottler returns a throttler that will never throttle -func NewNoThrottler() Throttler { return noThrottler{} } +type noCPUTracker struct{} + +func (noCPUTracker) UtilizeCPU(ids.ShortID, time.Duration) {} + +func (noCPUTracker) GetUtilization(ids.ShortID) float64 { return 0 } + +func (noCPUTracker) EndInterval() {} + +// NewNoCPUTracker returns a CPUTracker that does not track CPU usage and +// always returns 0 for the utilization value +func NewNoCPUTracker() CPUTracker { return noCPUTracker{} } diff --git a/snow/networking/throttler/throttler.go b/snow/networking/throttler/throttler.go index 3c0ef9679bab..bf4411863d16 100644 --- a/snow/networking/throttler/throttler.go +++ b/snow/networking/throttler/throttler.go @@ -9,6 +9,11 @@ import ( "github.com/ava-labs/gecko/ids" ) +const ( + DefaultMaxNonStakerPendingMsgs uint32 = 3 + DefaultStakerPortion float64 = 0.2 +) + // Throttler provides an interface to register consumption // of resources and prioritize messages from nodes that have // used less CPU time. @@ -19,3 +24,19 @@ type Throttler interface { GetUtilization(ids.ShortID) (float64, bool) // Returns the CPU based priority and whether or not the peer has too many pending messages EndInterval() // Notify throttler that the current period has ended } + +// CPUTracker tracks the consumption of CPU time +type CPUTracker interface { + UtilizeCPU(ids.ShortID, time.Duration) + GetUtilization(ids.ShortID) float64 + EndInterval() +} + +// CountingThrottler tracks the usage of a discrete resource (ex. pending messages) by a peer +// and determines whether or not a peer should be throttled. +type CountingThrottler interface { + Add(ids.ShortID) + Remove(ids.ShortID) + Throttle(ids.ShortID) bool + EndInterval() +} diff --git a/snow/networking/throttler/throttler_test.go b/snow/networking/throttler/throttler_test.go index c7d3d2cc54ab..5c9c640d72bf 100644 --- a/snow/networking/throttler/throttler_test.go +++ b/snow/networking/throttler/throttler_test.go @@ -12,52 +12,80 @@ import ( "github.com/ava-labs/gecko/utils/logging" ) -const ( - defaultMaxNonStakerPendingMsgs uint32 = 3 -) - -func TestEWMAThrottler(t *testing.T) { +func TestEWMATrackerPrioritizes(t *testing.T) { vdrs := validators.NewSet() validator0 := validators.GenerateRandomValidator(1) validator1 := validators.GenerateRandomValidator(1) + nonStaker := ids.NewShortID([20]byte{1}) vdrs.Add(validator0) vdrs.Add(validator1) - maxMessages := uint32(16) - msgPortion := 0.25 cpuPortion := 0.25 period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, defaultMaxNonStakerPendingMsgs, msgPortion, cpuPortion, period, logging.NoLog{}) + throttler := NewEWMATracker(vdrs, cpuPortion, period, logging.NoLog{}) throttler.UtilizeCPU(validator0.ID(), 25*time.Millisecond) throttler.UtilizeCPU(validator1.ID(), 5*time.Second) - cpu0, throttle0 := throttler.GetUtilization(validator0.ID()) - cpu1, throttle1 := throttler.GetUtilization(validator1.ID()) - - if throttle0 { - t.Fatalf("Should not throttle validator0 with no pending messages") - } - if throttle1 { - t.Fatalf("Should not throttle validator1 with no pending messages") - } + cpu0 := throttler.GetUtilization(validator0.ID()) + cpu1 := throttler.GetUtilization(validator1.ID()) + cpuNonStaker := throttler.GetUtilization(nonStaker) if cpu1 <= cpu0 { t.Fatalf("CPU utilization for validator1: %f should be greater than that of validator0: %f", cpu1, cpu0) } - // Test that throttler prevents unknown validators from taking up half the message queue - for i := uint32(0); i < maxMessages; i++ { - throttler.AddMessage(ids.NewShortID([20]byte{byte(i)})) + if cpuNonStaker < cpu1 { + t.Fatalf("CPU Utilization for non-staker: %f should be greater than or equal to the CPU Utilization for the highest spending staker: %f", cpuNonStaker, cpu1) + } +} + +func TestEWMATrackerPrunesSpenders(t *testing.T) { + vdrs := validators.NewSet() + staker0 := validators.GenerateRandomValidator(1) + staker1 := validators.GenerateRandomValidator(1) + nonStaker0 := ids.NewShortID([20]byte{1}) + nonStaker1 := ids.NewShortID([20]byte{2}) + + vdrs.Add(staker0) + vdrs.Add(staker1) + + cpuPortion := 0.25 + period := time.Second + throttler := NewEWMATracker(vdrs, cpuPortion, period, logging.NoLog{}) + + throttler.UtilizeCPU(staker0.ID(), 1.0) + throttler.UtilizeCPU(nonStaker0, 1.0) + + // 3 Cases: + // Stakers should not be pruned + // Non-stakers with non-zero cpuEWMA should not be pruned + // Non-stakers with cpuEWMA of 0 should be pruned + + // After 64 intervals nonStaker0 should be removed because its cpuEWMA statistic should reach 0 + // while nonStaker1 utilizes the CPU in every interval, so it should not be removed. + for i := 0; i < 64; i++ { + throttler.UtilizeCPU(nonStaker1, 1.0) + throttler.EndInterval() } - _, throttle := throttler.GetUtilization(ids.NewShortID([20]byte{'s', 'y', 'b', 'i', 'l'})) - if !throttle { - t.Fatal("Throttler should have started throttling messages from unknown peers") + // Ensure that the validators and the non-staker heard from every interval were not pruned + ewmat := throttler.(*ewmaCPUTracker) + if _, ok := ewmat.cpuSpenders[staker0.ID().Key()]; !ok { + t.Fatal("Staker was pruned from the set of spenders") + } + if _, ok := ewmat.cpuSpenders[staker1.ID().Key()]; !ok { + t.Fatal("Staker was pruned from the set of spenders") + } + if _, ok := ewmat.cpuSpenders[nonStaker0.Key()]; ok { + t.Fatal("Non-staker, not heard from in 64 periods, should have been pruned from the set of spenders") + } + if _, ok := ewmat.cpuSpenders[nonStaker1.Key()]; ok { + t.Fatal("Non-staker heard from in every period, was pruned from the set of spenders") } } -func TestThrottlerPrunesSpenders(t *testing.T) { +func TestMessageThrottlerPrunesSpenders(t *testing.T) { vdrs := validators.NewSet() staker0 := validators.GenerateRandomValidator(1) staker1 := validators.GenerateRandomValidator(1) @@ -69,44 +97,50 @@ func TestThrottlerPrunesSpenders(t *testing.T) { vdrs.Add(staker1) maxMessages := uint32(1024) - cpuPortion := 0.25 msgPortion := 0.25 - period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, defaultMaxNonStakerPendingMsgs, msgPortion, cpuPortion, period, logging.NoLog{}) - throttler.AddMessage(nonStaker2) // nonStaker2 should not be removed with a pending message - throttler.UtilizeCPU(nonStaker0, 1.0) - throttler.UtilizeCPU(nonStaker1, 1.0) - intervalsUntilPruning := int(defaultIntervalsUntilPruning) - // Let two intervals pass with no activity to ensure that nonStaker1 can be pruned + throttler := NewMessageThrottler(vdrs, maxMessages, DefaultMaxNonStakerPendingMsgs, msgPortion, logging.NoLog{}) + + // 4 Cases: + // Stakers should not be pruned + // Non-stakers with pending messages should not be pruned + // Non-stakers heard from recently should not be pruned + // Non-stakers not heard from in [defaultIntervalsUntilPruning] should be pruned + + // Add pending messages for nonStaker1 and nonStaker2 + throttler.Add(nonStaker2) // Will not be removed, so it should not be pruned + throttler.Add(nonStaker1) + throttler.EndInterval() + throttler.Remove(nonStaker1) // The pending message was removed, so nonStaker1 should be pruned throttler.EndInterval() - throttler.UtilizeCPU(nonStaker0, 1.0) + intervalsUntilPruning := int(defaultIntervalsUntilPruning) // Let the required number of intervals elapse to allow nonStaker1 to be pruned for i := 0; i < intervalsUntilPruning; i++ { + throttler.Add(nonStaker0) // nonStaker0 is heard from in every interval, so it should not be pruned throttler.EndInterval() + throttler.Remove(nonStaker0) } - // Ensure that the validators and the non-staker heard from in the past [intervalsUntilPruning] were not pruned - ewmat := throttler.(*ewmaThrottler) - if _, ok := ewmat.spenders[staker0.ID().Key()]; !ok { + msgThrottler := throttler.(*messageThrottler) + if _, ok := msgThrottler.msgSpenders[staker0.ID().Key()]; !ok { t.Fatal("Staker was pruned from the set of spenders") } - if _, ok := ewmat.spenders[staker1.ID().Key()]; !ok { + if _, ok := msgThrottler.msgSpenders[staker1.ID().Key()]; !ok { t.Fatal("Staker was pruned from the set of spenders") } - if _, ok := ewmat.spenders[nonStaker0.Key()]; !ok { - t.Fatal("Non-staker heard from recently was pruned from the set of spenders") + if _, ok := msgThrottler.msgSpenders[nonStaker0.Key()]; !ok { + t.Fatal("Non-staker heard from within [intervalsUntilPruning] was removed from the set of spenders") } - if _, ok := ewmat.spenders[nonStaker1.Key()]; ok { - t.Fatal("Non-staker not heard from in a long time was not pruned from the set of spenders") + if _, ok := msgThrottler.msgSpenders[nonStaker1.Key()]; ok { + t.Fatal("Non-staker not heard from within [intervalsUntilPruning] was not removed from the set of spenders") } - if _, ok := ewmat.spenders[nonStaker2.Key()]; !ok { + if _, ok := msgThrottler.msgSpenders[nonStaker2.Key()]; !ok { t.Fatal("Non-staker with a pending message was pruned from the set of spenders") } } -func TestThrottleStaker(t *testing.T) { +func TestMessageThrottling(t *testing.T) { vdrs := validators.NewSet() staker0 := validators.GenerateRandomValidator(1) staker1 := validators.GenerateRandomValidator(1) @@ -116,62 +150,69 @@ func TestThrottleStaker(t *testing.T) { vdrs.Add(staker0) vdrs.Add(staker1) - maxMessages := uint32(9) + maxMessages := uint32(8) msgPortion := 0.25 - cpuPortion := 0.25 - period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, defaultMaxNonStakerPendingMsgs, msgPortion, cpuPortion, period, logging.NoLog{}) + throttler := NewMessageThrottler(vdrs, maxMessages, DefaultMaxNonStakerPendingMsgs, msgPortion, logging.NoLog{}) // Message Allotment: 0.5 * 0.25 * 8 = 1 - // Message Pool: 6 messages - // Max Messages: 1 + defaultMaxNonStakerPendingMsgs + // Message Pool: 8 * 0.75 = 6 messages + // Max Messages: 1 + DefaultMaxNonStakerPendingMsgs // Validator should be throttled if it has exceeded its max messages // or it has exceeded its message allotment and the shared message pool is empty. // staker0 consumes its entire message allotment // Ensure that it is allowed to consume its entire max messages before being throttled - for i := 0; i < int(defaultMaxNonStakerPendingMsgs)+1; i++ { - throttler.AddMessage(staker0.ID()) - if _, throttle := throttler.GetUtilization(staker0.ID()); throttle { + for i := 0; i < int(DefaultMaxNonStakerPendingMsgs)+1; i++ { + throttler.Add(staker0.ID()) + if throttler.Throttle(staker0.ID()) { t.Fatal("Should not throttle message from staker until it has exceeded its own allotment") } } - throttler.AddMessage(staker0.ID()) - if _, throttle := throttler.GetUtilization(staker0.ID()); !throttle { - t.Fatal("Should have throttled message after exceeding message") + // Ensure staker is throttled after exceeding its own max messages cap + throttler.Add(staker0.ID()) + if !throttler.Throttle(staker0.ID()) { + t.Fatal("Should have throttled message after exceeding message cap") } - // Remove messages to reduce staker0 to have its normal message allotment pending - for i := 0; i < int(defaultMaxNonStakerPendingMsgs); i++ { - throttler.RemoveMessage(staker0.ID()) + // Remove messages to reduce staker0 to have its normal message allotment in pending + for i := 0; i < int(DefaultMaxNonStakerPendingMsgs)+1; i++ { + throttler.Remove(staker0.ID()) } // Consume the entire message pool among two non-stakers - for i := 0; i < int(defaultMaxNonStakerPendingMsgs); i++ { - throttler.AddMessage(nonStaker0) - throttler.AddMessage(nonStaker1) + for i := 0; i < int(DefaultMaxNonStakerPendingMsgs); i++ { + throttler.Add(nonStaker0) + throttler.Add(nonStaker1) // Neither should be throttled because they are only consuming until their own messsage cap // and the shared pool has been emptied. - if _, throttle := throttler.GetUtilization(nonStaker0); throttle { + if throttler.Throttle(nonStaker0) { t.Fatalf("Should not have throttled message from nonStaker0 after %d messages", i) } - if _, throttle := throttler.GetUtilization(nonStaker1); throttle { + if throttler.Throttle(nonStaker1) { t.Fatalf("Should not have throttled message from nonStaker1 after %d messages", i) } } // An additional message from staker0 should now cause it to be throttled since the mesasage pool // has been emptied. - if _, throttle := throttler.GetUtilization(staker0.ID()); throttle { + if throttler.Throttle(staker0.ID()) { t.Fatal("Should not have throttled message from staker until it had exceeded its message allotment.") } - throttler.AddMessage(staker0.ID()) - if _, throttle := throttler.GetUtilization(staker0.ID()); !throttle { + throttler.Add(staker0.ID()) + if !throttler.Throttle(staker0.ID()) { t.Fatal("Should have throttled message from staker0 after it exceeded its message allotment because the message pool was empty.") } + + if !throttler.Throttle(nonStaker0) { + t.Fatal("Should have throttled message from nonStaker0 after the message pool was emptied") + } + + if !throttler.Throttle(nonStaker1) { + t.Fatal("Should have throttled message from nonStaker1 after the message pool was emptied") + } } func TestCalculatesEWMA(t *testing.T) { @@ -181,11 +222,9 @@ func TestCalculatesEWMA(t *testing.T) { vdrs.Add(validator0) vdrs.Add(validator1) - maxMessages := uint32(16) - msgPortion := 0.25 stakerPortion := 0.25 period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, defaultMaxNonStakerPendingMsgs, msgPortion, stakerPortion, period, logging.NoLog{}) + throttler := NewEWMATracker(vdrs, stakerPortion, period, logging.NoLog{}) // Spend X CPU time in consecutive intervals and ensure that the throttler correctly calculates EWMA spends := []time.Duration{ @@ -206,7 +245,7 @@ func TestCalculatesEWMA(t *testing.T) { throttler.EndInterval() } - ewmat := throttler.(*ewmaThrottler) + ewmat := throttler.(*ewmaCPUTracker) sp := ewmat.getSpender(validator0.ID()) if sp.cpuEWMA != ewma { t.Fatalf("EWMA Throttler calculated EWMA incorrectly, expected: %s, but calculated: %s", ewma, sp.cpuEWMA) diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 6e8cfd87a968..a3ff946c91fc 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -26,6 +26,7 @@ import ( "github.com/ava-labs/gecko/snow/engine/snowman/bootstrap" "github.com/ava-labs/gecko/snow/networking/router" "github.com/ava-labs/gecko/snow/networking/sender" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/constants" @@ -1641,9 +1642,9 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { vdrs, msgChan, 1000, - router.DefaultMaxNonStakerPendingMsgs, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) diff --git a/vms/spchainvm/consensus_benchmark_test.go b/vms/spchainvm/consensus_benchmark_test.go index 0f49b89ca4ac..a4e5b0d34f4a 100644 --- a/vms/spchainvm/consensus_benchmark_test.go +++ b/vms/spchainvm/consensus_benchmark_test.go @@ -18,7 +18,9 @@ import ( "github.com/ava-labs/gecko/snow/engine/snowman/bootstrap" "github.com/ava-labs/gecko/snow/networking/router" "github.com/ava-labs/gecko/snow/networking/sender" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" + "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging" @@ -111,9 +113,9 @@ func ConsensusLeader(numBlocks, numTxsPerBlock int, b *testing.B) { vdrs, msgChan, 1000, - router.DefaultMaxNonStakerPendingMsgs, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -254,9 +256,9 @@ func ConsensusFollower(numBlocks, numTxsPerBlock int, b *testing.B) { vdrs, msgChan, 1000, - router.DefaultMaxNonStakerPendingMsgs, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) From 7531e761ad6cb2b117eee2a0174bd83761035c34 Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Sat, 29 Aug 2020 16:08:59 -0400 Subject: [PATCH 3/4] Rename command line parameter setting max number of peer's pending msgs --- main/params.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main/params.go b/main/params.go index 03997835749e..8c9d9aae8fab 100644 --- a/main/params.go +++ b/main/params.go @@ -202,7 +202,7 @@ func init() { fs.Uint64Var(&Config.DisabledStakingWeight, "staking-disabled-weight", 1, "Weight to provide to each peer when staking is disabled") // Throttling: - fs.UintVar(&Config.MaxNonStakerPendingMsgs, "max-non-staker-pending", 3, "Maximum number of messages a non-staker is allowed to have pending.") + fs.UintVar(&Config.MaxNonStakerPendingMsgs, "max-non-staker-pending-msgs", 3, "Maximum number of messages a non-staker is allowed to have pending.") fs.Float64Var(&Config.StakerMsgPortion, "staker-msg-reserved", 0.2, "Reserve a portion of the chain message queue's space for stakers.") fs.Float64Var(&Config.StakerCPUPortion, "staker-cpu-reserved", 0.2, "Reserve a portion of the chain's CPU time for stakers.") From 6d885f26325f23e2f1c6f085c1e16f26b87375a0 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Sun, 30 Aug 2020 05:32:05 -0400 Subject: [PATCH 4/4] cleaned up import --- vms/spchainvm/consensus_benchmark_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/vms/spchainvm/consensus_benchmark_test.go b/vms/spchainvm/consensus_benchmark_test.go index 87afd9385278..1f73ed5fd105 100644 --- a/vms/spchainvm/consensus_benchmark_test.go +++ b/vms/spchainvm/consensus_benchmark_test.go @@ -20,7 +20,6 @@ import ( "github.com/ava-labs/gecko/snow/networking/sender" "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" - "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging"