diff --git a/chains/manager.go b/chains/manager.go index 4a285e2b36c1..2991935ff379 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -115,6 +115,7 @@ type manager struct { stakingEnabled bool // True iff the network has staking enabled stakerMsgPortion, stakerCPUPortion float64 + maxNonStakerPendingMsgs uint32 log logging.Logger logFactory logging.Factory vmManager vms.Manager // Manage mappings from vm ID --> vm @@ -152,6 +153,7 @@ type manager struct { // TODO: Make this function take less arguments func New( stakingEnabled bool, + maxNonStakerPendingMsgs uint, stakerMsgPortion, stakerCPUPortion float64, log logging.Logger, @@ -186,29 +188,30 @@ func New( rtr.Initialize(log, &timeoutManager, gossipFrequency, shutdownTimeout) m := &manager{ - stakingEnabled: stakingEnabled, - stakerMsgPortion: stakerMsgPortion, - stakerCPUPortion: stakerCPUPortion, - log: log, - logFactory: logFactory, - vmManager: vmManager, - decisionEvents: decisionEvents, - consensusEvents: consensusEvents, - db: db, - chainRouter: rtr, - net: net, - timeoutManager: &timeoutManager, - consensusParams: consensusParams, - validators: validators, - nodeID: nodeID, - networkID: networkID, - server: server, - keystore: keystore, - atomicMemory: atomicMemory, - avaxAssetID: avaxAssetID, - xChainID: xChainID, - criticalChains: criticalChains, - chains: make(map[[32]byte]*router.Handler), + stakingEnabled: stakingEnabled, + maxNonStakerPendingMsgs: uint32(maxNonStakerPendingMsgs), + stakerMsgPortion: stakerMsgPortion, + stakerCPUPortion: stakerCPUPortion, + log: log, + logFactory: logFactory, + vmManager: vmManager, + decisionEvents: decisionEvents, + consensusEvents: consensusEvents, + db: db, + chainRouter: rtr, + net: net, + timeoutManager: &timeoutManager, + consensusParams: consensusParams, + validators: validators, + nodeID: nodeID, + networkID: networkID, + server: server, + keystore: keystore, + atomicMemory: atomicMemory, + avaxAssetID: avaxAssetID, + xChainID: xChainID, + criticalChains: criticalChains, + chains: make(map[[32]byte]*router.Handler), } m.Initialize() return m, nil @@ -511,6 +514,7 @@ func (m *manager) createAvalancheChain( validators, msgChan, defaultChannelSize, + m.maxNonStakerPendingMsgs, m.stakerMsgPortion, m.stakerCPUPortion, fmt.Sprintf("%s_handler", consensusParams.Namespace), @@ -589,6 +593,7 @@ func (m *manager) createSnowmanChain( validators, msgChan, defaultChannelSize, + m.maxNonStakerPendingMsgs, m.stakerMsgPortion, m.stakerCPUPortion, fmt.Sprintf("%s_handler", consensusParams.Namespace), diff --git a/main/params.go b/main/params.go index 8f477da7ac7b..239037a9b626 100644 --- a/main/params.go +++ b/main/params.go @@ -202,6 +202,7 @@ func init() { fs.Uint64Var(&Config.DisabledStakingWeight, "staking-disabled-weight", 1, "Weight to provide to each peer when staking is disabled") // Throttling: + fs.UintVar(&Config.MaxNonStakerPendingMsgs, "max-non-staker-pending-msgs", 3, "Maximum number of messages a non-staker is allowed to have pending.") fs.Float64Var(&Config.StakerMsgPortion, "staker-msg-reserved", 0.2, "Reserve a portion of the chain message queue's space for stakers.") fs.Float64Var(&Config.StakerCPUPortion, "staker-cpu-reserved", 0.2, "Reserve a portion of the chain's CPU time for stakers.") diff --git a/node/config.go b/node/config.go index aed3c9cec727..5e6ad9f8dcec 100644 --- a/node/config.go +++ b/node/config.go @@ -36,15 +36,16 @@ type Config struct { DB database.Database // Staking configuration - StakingIP utils.IPDesc - StakingLocalPort uint16 - EnableP2PTLS bool - EnableStaking bool - StakingKeyFile string - StakingCertFile string - DisabledStakingWeight uint64 - StakerMsgPortion float64 - StakerCPUPortion float64 + StakingIP utils.IPDesc + StakingLocalPort uint16 + EnableP2PTLS bool + EnableStaking bool + StakingKeyFile string + StakingCertFile string + DisabledStakingWeight uint64 + MaxNonStakerPendingMsgs uint + StakerMsgPortion float64 + StakerCPUPortion float64 // Bootstrapping configuration BootstrapPeers []*Peer diff --git a/node/node.go b/node/node.go index 960833a03e65..6c62b9add9b0 100644 --- a/node/node.go +++ b/node/node.go @@ -414,6 +414,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { n.chainManager, err = chains.New( n.Config.EnableStaking, + n.Config.MaxNonStakerPendingMsgs, n.Config.StakerMsgPortion, n.Config.StakerCPUPortion, n.Log, diff --git a/snow/networking/router/chain_router_test.go b/snow/networking/router/chain_router_test.go index 3ab075a7b53b..ede7004d4b41 100644 --- a/snow/networking/router/chain_router_test.go +++ b/snow/networking/router/chain_router_test.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging" @@ -39,8 +40,9 @@ func TestShutdown(t *testing.T) { validators.NewSet(), nil, 1, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -97,8 +99,9 @@ func TestShutdownTimesOut(t *testing.T) { validators.NewSet(), nil, 1, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index 278a6fbca244..2368e354d7a4 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -18,12 +18,6 @@ import ( "github.com/ava-labs/gecko/utils/timer" ) -const ( - // DefaultStakerPortion defines the default percentage of resources to - // allocate to stakers. - DefaultStakerPortion float64 = 0.2 -) - // Requirement: A set of nodes spamming messages (potentially costly) shouldn't // impact other node's queries. @@ -120,6 +114,7 @@ func (h *Handler) Initialize( validators validators.Set, msgChan <-chan common.Message, bufferSize int, + maxNonStakerPendingMsgs uint32, stakerMsgPortion, stakerCPUPortion float64, namespace string, @@ -159,6 +154,7 @@ func (h *Handler) Initialize( consumptionRanges, consumptionAllotments, bufferSize, + maxNonStakerPendingMsgs, cpuInterval, stakerMsgPortion, stakerCPUPortion, diff --git a/snow/networking/router/handler_test.go b/snow/networking/router/handler_test.go index cf904549cf00..3981b63c4885 100644 --- a/snow/networking/router/handler_test.go +++ b/snow/networking/router/handler_test.go @@ -14,6 +14,7 @@ import ( "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" + "github.com/ava-labs/gecko/snow/networking/throttler" ) func TestHandlerDropsTimedOutMessages(t *testing.T) { @@ -41,8 +42,9 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) { vdrs, nil, 16, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -83,8 +85,9 @@ func TestHandlerDoesntDrop(t *testing.T) { validators, nil, 16, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -118,8 +121,9 @@ func TestHandlerClosesOnError(t *testing.T) { validators.NewSet(), nil, 16, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) diff --git a/snow/networking/router/service_queue.go b/snow/networking/router/service_queue.go index e054129465b6..708f87c23d5f 100644 --- a/snow/networking/router/service_queue.go +++ b/snow/networking/router/service_queue.go @@ -31,8 +31,9 @@ type messageQueue interface { type multiLevelQueue struct { lock sync.Mutex - validators validators.Set - throttler throttler.Throttler + validators validators.Set + cpuTracker throttler.CPUTracker + msgThrottler throttler.CountingThrottler // Tracks total CPU consumption intervalConsumption, tierConsumption, cpuInterval time.Duration @@ -60,13 +61,15 @@ func newMultiLevelQueue( consumptionRanges []float64, consumptionAllotments []time.Duration, bufferSize int, + maxNonStakerPendingMsgs uint32, cpuInterval time.Duration, msgPortion, cpuPortion float64, ) (messageQueue, chan struct{}) { semaChan := make(chan struct{}, bufferSize) singleLevelSize := bufferSize / len(consumptionRanges) - throttler := throttler.NewEWMAThrottler(vdrs, uint32(bufferSize), msgPortion, cpuPortion, cpuInterval, log) + cpuTracker := throttler.NewEWMATracker(vdrs, cpuPortion, cpuInterval, log) + msgThrottler := throttler.NewMessageThrottler(vdrs, uint32(bufferSize), maxNonStakerPendingMsgs, msgPortion, log) queues := make([]singleLevelQueue, len(consumptionRanges)) for index := 0; index < len(queues); index++ { gauge, histogram, err := metrics.registerTierStatistics(index) @@ -84,7 +87,8 @@ func newMultiLevelQueue( return &multiLevelQueue{ validators: vdrs, - throttler: throttler, + cpuTracker: cpuTracker, + msgThrottler: msgThrottler, queues: queues, cpuRanges: consumptionRanges, cpuAllotments: consumptionAllotments, @@ -116,7 +120,7 @@ func (ml *multiLevelQueue) PushMessage(msg message) bool { return false } ml.pendingMessages++ - ml.throttler.AddMessage(msg.validatorID) + ml.msgThrottler.Add(msg.validatorID) select { case ml.semaChan <- struct{}{}: default: @@ -134,7 +138,7 @@ func (ml *multiLevelQueue) PopMessage() (message, error) { msg, err := ml.popMessage() if err == nil { ml.pendingMessages-- - ml.throttler.RemoveMessage(msg.validatorID) + ml.msgThrottler.Remove(msg.validatorID) ml.metrics.pending.Dec() } return msg, err @@ -145,7 +149,7 @@ func (ml *multiLevelQueue) UtilizeCPU(vdr ids.ShortID, duration time.Duration) { ml.lock.Lock() defer ml.lock.Unlock() - ml.throttler.UtilizeCPU(vdr, duration) + ml.cpuTracker.UtilizeCPU(vdr, duration) ml.intervalConsumption += duration ml.tierConsumption += duration if ml.tierConsumption > ml.cpuAllotments[ml.currentTier] { @@ -160,7 +164,8 @@ func (ml *multiLevelQueue) EndInterval() { ml.lock.Lock() defer ml.lock.Unlock() - ml.throttler.EndInterval() + ml.cpuTracker.EndInterval() + ml.msgThrottler.EndInterval() ml.metrics.cpu.Observe(float64(ml.intervalConsumption.Milliseconds())) ml.intervalConsumption = 0 } @@ -189,7 +194,7 @@ func (ml *multiLevelQueue) popMessage() (message, error) { ml.queues[ml.currentTier].waitingTime.Observe(float64(time.Since(msg.received))) // Check where messages from this validator currently belong - cpu, _ := ml.throttler.GetUtilization(msg.validatorID) + cpu := ml.cpuTracker.GetUtilization(msg.validatorID) correctIndex := ml.getPriorityIndex(cpu) // If the message is at least the priority of the current tier @@ -227,12 +232,12 @@ func (ml *multiLevelQueue) pushMessage(msg message) bool { ml.log.Warn("Dropping message due to invalid validatorID") return false } - cpu, throttle := ml.throttler.GetUtilization(validatorID) + throttle := ml.msgThrottler.Throttle(validatorID) if throttle { ml.metrics.throttled.Inc() return false } - + cpu := ml.cpuTracker.GetUtilization(validatorID) queueIndex := ml.getPriorityIndex(cpu) return ml.waterfallMessage(msg, queueIndex) diff --git a/snow/networking/router/service_queue_test.go b/snow/networking/router/service_queue_test.go index b79098864695..bad122e57579 100644 --- a/snow/networking/router/service_queue_test.go +++ b/snow/networking/router/service_queue_test.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/client_golang/prometheus" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging" ) @@ -42,9 +43,10 @@ func setupMultiLevelQueue(t *testing.T, bufferSize int) (messageQueue, chan stru consumptionRanges, consumptionAllotments, bufferSize, + throttler.DefaultMaxNonStakerPendingMsgs, time.Second, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, ) return queue, semaChan, vdrs @@ -169,9 +171,10 @@ func TestMultiLevelQueuePrioritizes(t *testing.T) { consumptionRanges, consumptionAllotments, bufferSize, + throttler.DefaultMaxNonStakerPendingMsgs, time.Second, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, ) // Utilize CPU such that the next message from validator2 will be placed on a lower @@ -263,9 +266,10 @@ func TestMultiLevelQueuePushesDownOldMessages(t *testing.T) { consumptionRanges, consumptionAllotments, bufferSize, + throttler.DefaultMaxNonStakerPendingMsgs, time.Second, - DefaultStakerPortion, - DefaultStakerPortion, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, ) queue.PushMessage(message{ diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 9b296b8f3f2e..26a2b91a37c7 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" "github.com/ava-labs/gecko/snow/networking/router" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging" @@ -67,8 +68,9 @@ func TestTimeout(t *testing.T) { validators.NewSet(), nil, 1, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -123,8 +125,9 @@ func TestReliableMessages(t *testing.T) { validators.NewSet(), nil, 1, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -189,8 +192,9 @@ func TestReliableMessagesToMyself(t *testing.T) { validators.NewSet(), nil, 1, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) diff --git a/snow/networking/throttler/ewma.go b/snow/networking/throttler/ewma.go index 8d33610069d0..9cfc0169a201 100644 --- a/snow/networking/throttler/ewma.go +++ b/snow/networking/throttler/ewma.go @@ -15,52 +15,39 @@ import ( ) const ( - defaultDecayFactor float64 = 2 - defaultIntervalsUntilPruning uint32 = 60 - defaultMinimumCPUAllotment = time.Nanosecond + defaultDecayFactor float64 = 2 + defaultMinimumCPUAllotment = time.Nanosecond ) -type ewmaThrottler struct { +type ewmaCPUTracker struct { lock sync.Mutex log logging.Logger // Track peers - spenders map[[20]byte]*spender + cpuSpenders map[[20]byte]*cpuSpender cumulativeEWMA time.Duration vdrs validators.Set // Track CPU utilization - decayFactor float64 - stakerCPU, nonReservedCPU time.Duration - - // Track pending messages - reservedStakerMessages uint32 - pendingNonReservedMsgs, nonReservedMsgs uint32 - maxNonStakerPendingMsgs uint32 - - // Statistics adjusted at every interval - currentPeriod uint32 + decayFactor float64 // Factor used to discount the EWMA at every period + stakerCPU time.Duration // Amount of CPU time reserved for stakers + nonReservedCPU time.Duration // Amount of CPU time that is not reserved for stakers } -// NewEWMAThrottler returns a Throttler that uses exponentially weighted moving +// NewEWMATracker returns a CPUTracker that uses exponentially weighted moving // average to estimate CPU utilization. // -// [maxMessages] is the maximum number of messages allotted to this chain -// [stakerMsgPortion] is the portion of messages to reserve exclusively for stakers -// [stakerCPUPortion] is the portion of CPU utilization to reserve for stakers -// both staker portions should be in the range (0, 1] +// [stakerCPUPortion] is the portion of CPU utilization to reserve for stakers (range (0, 1]) // [period] is the interval of time to use for the calculation of EWMA // -// Note: ewmaThrottler uses the period as the total amount of time per interval, +// Note: ewmaCPUTracker uses the period as the total amount of time per interval, // which is not the limit since it tracks consumption using EWMA. -func NewEWMAThrottler( +func NewEWMATracker( vdrs validators.Set, - maxMessages uint32, - stakerMsgPortion, stakerCPUPortion float64, period time.Duration, log logging.Logger, -) Throttler { +) CPUTracker { // Amount of CPU time reserved for processing messages from stakers stakerCPU := time.Duration(float64(period) * stakerCPUPortion) if stakerCPU < defaultMinimumCPUAllotment { @@ -75,67 +62,30 @@ func NewEWMAThrottler( nonReservedCPU = defaultMinimumCPUAllotment } - // Number of messages reserved for Stakers vs. Non-Stakers - reservedStakerMessages := uint32(stakerMsgPortion * float64(maxMessages)) - nonReservedMsgs := maxMessages - reservedStakerMessages - - throttler := &ewmaThrottler{ - spenders: make(map[[20]byte]*spender), - vdrs: vdrs, - log: log, + throttler := &ewmaCPUTracker{ + cpuSpenders: make(map[[20]byte]*cpuSpender), + vdrs: vdrs, + log: log, decayFactor: defaultDecayFactor, stakerCPU: stakerCPU, nonReservedCPU: nonReservedCPU, - - reservedStakerMessages: reservedStakerMessages, - nonReservedMsgs: nonReservedMsgs, } - // Add validators to spenders, so that they will be calculated correctly in + // Add validators to cpuSpenders, so that they will be calculated correctly in // EndInterval for _, vdr := range vdrs.List() { - throttler.spenders[vdr.ID().Key()] = &spender{} + throttler.cpuSpenders[vdr.ID().Key()] = &cpuSpender{} } // Call EndInterval to calculate initial period statistics and initial - // spender values for validators + // cpuSpender values for validators throttler.EndInterval() return throttler } -func (et *ewmaThrottler) AddMessage(validatorID ids.ShortID) { - et.lock.Lock() - defer et.lock.Unlock() - - sp := et.getSpender(validatorID) - sp.pendingMessages++ - - // If the spender has exceeded its message allotment, then the additional - // message is taken from the pool - if sp.pendingMessages > sp.msgAllotment { - sp.pendingPoolMessages++ - et.pendingNonReservedMsgs++ - } -} - -func (et *ewmaThrottler) RemoveMessage(validatorID ids.ShortID) { - et.lock.Lock() - defer et.lock.Unlock() - - sp := et.getSpender(validatorID) - sp.pendingMessages-- - - // If the spender has pending messages taken from the pool, - // they are the first messages to be removed. - if sp.pendingPoolMessages > 0 { - sp.pendingPoolMessages-- - et.pendingNonReservedMsgs-- - } -} - -func (et *ewmaThrottler) UtilizeCPU( +func (et *ewmaCPUTracker) UtilizeCPU( validatorID ids.ShortID, consumption time.Duration, ) { @@ -144,140 +94,86 @@ func (et *ewmaThrottler) UtilizeCPU( sp := et.getSpender(validatorID) sp.cpuEWMA += consumption - sp.lastSpend = et.currentPeriod et.cumulativeEWMA += consumption } -// Returns CPU GetUtilization metric as percentage of expected utilization and -// boolean specifying whether or not the validator has exceeded its message -// allotment. -func (et *ewmaThrottler) GetUtilization( - validatorID ids.ShortID, -) (float64, bool) { +// GetUtilization returns a percentage of expected CPU utilization of the peer +// corresponding to [validatorID] +func (et *ewmaCPUTracker) GetUtilization(validatorID ids.ShortID) float64 { et.lock.Lock() defer et.lock.Unlock() sharedUtilization := float64(et.cumulativeEWMA) / float64(et.nonReservedCPU) sp := et.getSpender(validatorID) if !sp.staking { - exceedsMessageAllotment := et.pendingNonReservedMsgs > et.nonReservedMsgs || // the shared message pool has been taken - sp.pendingMessages > sp.maxMessages // exceeds its own individual message cap - - if exceedsMessageAllotment { - et.log.Verbo("Throttling non-staker %s: %s. Pending pool messages: %d/%d.", - validatorID, - sp, - et.pendingNonReservedMsgs, - et.nonReservedMsgs) - } - return sharedUtilization, exceedsMessageAllotment - } - - // Staker should only be throttled if it has exceeded its message allotment - // and there are either no messages left in the shared pool or it has - // exceeded its own maximum message allocation. - exceedsMessageAllotment := sp.pendingMessages > sp.msgAllotment && // exceeds its own individual message allotment - (et.pendingNonReservedMsgs > et.nonReservedMsgs || // no unreserved messages - sp.pendingMessages > sp.maxMessages) // exceeds its own individual message cap - - if exceedsMessageAllotment { - et.log.Debug("Throttling staker %s: %s. Pending pool messages: %d/%d.", - validatorID, - sp, - et.pendingNonReservedMsgs, - et.nonReservedMsgs) + return sharedUtilization } - return math.Min(float64(sp.cpuEWMA)/float64(sp.expectedCPU), sharedUtilization), exceedsMessageAllotment + return math.Min(float64(sp.cpuEWMA)/float64(sp.expectedCPU), sharedUtilization) } -func (et *ewmaThrottler) EndInterval() { +// EndInterval registers the end of a given CPU interval by discounting +// all cpuSpenders' cpuEWMA and removing outstanding spenders that have sufficiently +// low cpuEWMA stats +func (et *ewmaCPUTracker) EndInterval() { et.lock.Lock() defer et.lock.Unlock() - et.currentPeriod++ - et.cumulativeEWMA = time.Duration(float64(et.cumulativeEWMA) / et.decayFactor) stakingWeight := et.vdrs.Weight() - numPeers := et.vdrs.Len() + 1 - et.maxNonStakerPendingMsgs = et.nonReservedMsgs / uint32(numPeers) - for key, spender := range et.spenders { - spender.cpuEWMA = time.Duration(float64(spender.cpuEWMA) / et.decayFactor) + removed := 0 + for key, cpuSpender := range et.cpuSpenders { + cpuSpender.cpuEWMA = time.Duration(float64(cpuSpender.cpuEWMA) / et.decayFactor) if weight, ok := et.vdrs.GetWeight(ids.NewShortID(key)); ok { stakerPortion := float64(weight) / float64(stakingWeight) // Calculate staker allotment here - spender.staking = true - spender.msgAllotment = uint32(float64(et.reservedStakerMessages) * stakerPortion) - spender.maxMessages = uint32(float64(et.reservedStakerMessages)*stakerPortion) + et.maxNonStakerPendingMsgs - spender.expectedCPU = time.Duration(float64(et.stakerCPU)*stakerPortion) + defaultMinimumCPUAllotment + cpuSpender.staking = true + cpuSpender.expectedCPU = time.Duration(float64(et.stakerCPU)*stakerPortion) + defaultMinimumCPUAllotment continue } - if spender.lastSpend+defaultIntervalsUntilPruning < et.currentPeriod && spender.pendingMessages == 0 { - et.log.Debug("Removing validator from throttler after not hearing from it for %d periods", - et.currentPeriod-spender.lastSpend) - delete(et.spenders, key) + if cpuSpender.cpuEWMA == 0 { + removed++ + delete(et.cpuSpenders, key) } - // If the validator is not a staker and was not deleted, set its spender + // If the validator is not a staker and was not deleted, set its cpuSpender // attributes - spender.staking = false - spender.msgAllotment = 0 - spender.maxMessages = et.maxNonStakerPendingMsgs - spender.expectedCPU = defaultMinimumCPUAllotment + cpuSpender.staking = false + cpuSpender.expectedCPU = defaultMinimumCPUAllotment } + et.log.Debug("Removed %d validators from CPU Tracker.", removed) } -// getSpender returns the [spender] corresponding to [validatorID] -func (et *ewmaThrottler) getSpender(validatorID ids.ShortID) *spender { +// getSpender returns the [cpuSpender] corresponding to [validatorID] +func (et *ewmaCPUTracker) getSpender(validatorID ids.ShortID) *cpuSpender { validatorKey := validatorID.Key() - if sp, exists := et.spenders[validatorKey]; exists { + if sp, exists := et.cpuSpenders[validatorKey]; exists { return sp } - // If this validator did not exist in spenders, create it and return - sp := &spender{ - maxMessages: et.maxNonStakerPendingMsgs, + // If this validator did not exist in cpuSpenders, create it and return + sp := &cpuSpender{ expectedCPU: defaultMinimumCPUAllotment, } - et.spenders[validatorKey] = sp + et.cpuSpenders[validatorKey] = sp return sp } -type spender struct { - // Last period that this spender utilized the CPU - lastSpend uint32 - - // Number of pending messages this spender has taken from the pool - pendingPoolMessages uint32 - - // Number of messages this spender currently has pending - pendingMessages uint32 - - // Number of messages allocated to this spender as a staker - msgAllotment uint32 - - // Max number of messages this spender can use even if the shared pool is - // non-empty - maxMessages uint32 - - // EWMA of this spender's CPU utilization +type cpuSpender struct { + // EWMA of this cpuSpender's CPU utilization cpuEWMA time.Duration // The expected CPU utilization of this peer expectedCPU time.Duration - // Flag to indicate if this spender is a staker + // Flag to indicate if this cpuSpender is a staker staking bool } -func (sp *spender) String() string { - return fmt.Sprintf("Spender(Messages: (%d+%d)/(%d+%d), CPU: %s/%s)", - sp.pendingPoolMessages, - sp.pendingMessages-sp.pendingPoolMessages, - sp.msgAllotment, - sp.maxMessages-sp.msgAllotment, +func (sp *cpuSpender) String() string { + return fmt.Sprintf("CPUTracker(CPU: %s/%s)", sp.cpuEWMA, sp.expectedCPU, ) diff --git a/snow/networking/throttler/message_throttler.go b/snow/networking/throttler/message_throttler.go new file mode 100644 index 000000000000..7d47dcf72b72 --- /dev/null +++ b/snow/networking/throttler/message_throttler.go @@ -0,0 +1,221 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package throttler + +import ( + "fmt" + "sync" + + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/snow/validators" + "github.com/ava-labs/gecko/utils/logging" +) + +const ( + defaultIntervalsUntilPruning uint32 = 60 +) + +type messageThrottler struct { + lock sync.Mutex + log logging.Logger + + // Track peers + msgSpenders map[[20]byte]*msgSpender + vdrs validators.Set + + // Track pending messages + reservedStakerMessages uint32 // Number of messages reserved for stakers + nonReservedMsgs uint32 // Number of non-reserved messages left to a shared message pool + pendingNonReservedMsgs uint32 // Number of pending messages taken from the shared message pool + + // Cap on number of pending messages allowed to a non-staker + maxNonStakerPendingMsgs uint32 + + // Statistics adjusted at every interval + currentPeriod uint32 +} + +// NewMessageThrottler returns a MessageThrottler that throttles peers +// when they have too many pending messages outstanding. +// +// [maxMessages] is the maximum number of messages allotted to this chain +// [stakerMsgPortion] is the portion of messages to reserve exclusively for stakers +// should be in the range (0, 1] +func NewMessageThrottler( + vdrs validators.Set, + maxMessages, + maxNonStakerPendingMsgs uint32, + stakerMsgPortion float64, + log logging.Logger, +) CountingThrottler { + // Number of messages reserved for Stakers vs. Non-Stakers + reservedStakerMessages := uint32(stakerMsgPortion * float64(maxMessages)) + nonReservedMsgs := maxMessages - reservedStakerMessages + + throttler := &messageThrottler{ + msgSpenders: make(map[[20]byte]*msgSpender), + vdrs: vdrs, + log: log, + + reservedStakerMessages: reservedStakerMessages, + nonReservedMsgs: nonReservedMsgs, + maxNonStakerPendingMsgs: maxNonStakerPendingMsgs, + } + + // Add validators to msgSpenders, so that they will be calculated correctly in + // EndInterval + for _, vdr := range vdrs.List() { + throttler.msgSpenders[vdr.ID().Key()] = &msgSpender{} + } + + // Call EndInterval to calculate initial period statistics and initial + // msgSpender values for validators + throttler.EndInterval() + return throttler +} + +func (et *messageThrottler) Add(validatorID ids.ShortID) { + et.lock.Lock() + defer et.lock.Unlock() + + sp := et.getSpender(validatorID) + sp.pendingMessages++ + sp.lastSpend = et.currentPeriod + + // If the msgSpender has exceeded its message allotment, then the additional + // message is taken from the pool + if sp.pendingMessages > sp.msgAllotment { + sp.pendingPoolMessages++ + et.pendingNonReservedMsgs++ + } +} + +func (et *messageThrottler) Remove(validatorID ids.ShortID) { + et.lock.Lock() + defer et.lock.Unlock() + + sp := et.getSpender(validatorID) + sp.pendingMessages-- + + // If the msgSpender has pending messages taken from the pool, + // they are the first messages to be removed. + if sp.pendingPoolMessages > 0 { + sp.pendingPoolMessages-- + et.pendingNonReservedMsgs-- + } +} + +// Throttle returns true if messages from [validatorID] should be throttled due +// to having too many pending messages +func (et *messageThrottler) Throttle( + validatorID ids.ShortID, +) bool { + et.lock.Lock() + defer et.lock.Unlock() + + sp := et.getSpender(validatorID) + if !sp.staking { + exceedsMessageAllotment := et.pendingNonReservedMsgs > et.nonReservedMsgs || // the shared message pool has been taken + (sp.pendingMessages > sp.maxMessages) // Spender has exceeded its individual cap + + if exceedsMessageAllotment { + et.log.Verbo("Throttling non-staker %s: %s. Pending pool messages: %d/%d.", + validatorID, + sp, + et.pendingNonReservedMsgs, + et.nonReservedMsgs) + } + return exceedsMessageAllotment + } + + exceedsMessageAllotment := sp.pendingMessages > sp.msgAllotment && // Throttle if the staker has exceeded its allotment + (et.pendingNonReservedMsgs > et.nonReservedMsgs || // And either the shared message pool is empty + sp.pendingMessages > sp.maxMessages) // or this staker has exceeded its individual cap + + if exceedsMessageAllotment { + et.log.Debug("Throttling staker %s: %s. Pending pool messages: %d/%d.", + validatorID, + sp, + et.pendingNonReservedMsgs, + et.nonReservedMsgs) + } + return exceedsMessageAllotment +} + +func (et *messageThrottler) EndInterval() { + et.lock.Lock() + defer et.lock.Unlock() + + et.currentPeriod++ + stakingWeight := et.vdrs.Weight() + + for key, msgSpender := range et.msgSpenders { + if weight, exists := et.vdrs.GetWeight(ids.NewShortID(key)); exists { + stakerPortion := float64(weight) / float64(stakingWeight) + + // Calculate staker allotment here + msgSpender.staking = true + msgSpender.msgAllotment = uint32(float64(et.reservedStakerMessages) * stakerPortion) + msgSpender.maxMessages = msgSpender.msgAllotment + et.maxNonStakerPendingMsgs + continue + } + + if msgSpender.lastSpend+defaultIntervalsUntilPruning < et.currentPeriod && msgSpender.pendingMessages == 0 { + et.log.Debug("Removing validator from throttler after not hearing from it for %d periods", + et.currentPeriod-msgSpender.lastSpend) + delete(et.msgSpenders, key) + } + + // If the validator is not a staker and was not deleted, set its msgSpender + // attributes + msgSpender.staking = false + msgSpender.msgAllotment = 0 + msgSpender.maxMessages = et.maxNonStakerPendingMsgs + } +} + +// getSpender returns the [msgSpender] corresponding to [validatorID] +func (et *messageThrottler) getSpender(validatorID ids.ShortID) *msgSpender { + validatorKey := validatorID.Key() + if sp, exists := et.msgSpenders[validatorKey]; exists { + return sp + } + + // If this validator did not exist in msgSpenders, create it and return + sp := &msgSpender{ + maxMessages: et.maxNonStakerPendingMsgs, + } + et.msgSpenders[validatorKey] = sp + return sp +} + +type msgSpender struct { + // Last period that this msgSpender utilized the CPU + lastSpend uint32 + + // Number of pending messages this msgSpender has taken from the pool + pendingPoolMessages uint32 + + // Number of messages this msgSpender currently has pending + pendingMessages uint32 + + // Number of messages allocated to this msgSpender as a staker + msgAllotment uint32 + + // Max number of messages this msgSpender can use even if the shared pool is + // non-empty + maxMessages uint32 + + // Flag to indicate if this msgSpender is a staker + staking bool +} + +func (sp *msgSpender) String() string { + return fmt.Sprintf("MsgSpender(Messages: (%d+%d)/(%d+%d))", + sp.pendingPoolMessages, + sp.pendingMessages-sp.pendingPoolMessages, + sp.msgAllotment, + sp.maxMessages-sp.msgAllotment, + ) +} diff --git a/snow/networking/throttler/no.go b/snow/networking/throttler/no.go index 02f6dc18ae79..f734336375b9 100644 --- a/snow/networking/throttler/no.go +++ b/snow/networking/throttler/no.go @@ -9,17 +9,27 @@ import ( "github.com/ava-labs/gecko/ids" ) -type noThrottler struct{} +type noCountThrottler struct{} -func (noThrottler) AddMessage(ids.ShortID) {} +func (noCountThrottler) Add(ids.ShortID) {} -func (noThrottler) RemoveMessage(ids.ShortID) {} +func (noCountThrottler) Remove(ids.ShortID) {} -func (noThrottler) UtilizeCPU(ids.ShortID, time.Duration) {} +func (noCountThrottler) Throttle(ids.ShortID) bool { return false } -func (noThrottler) GetUtilization(ids.ShortID) (float64, bool) { return 0, false } +func (noCountThrottler) EndInterval() {} -func (noThrottler) EndInterval() {} +// NewNoCountThrottler returns a CountingThrottler that will never throttle +func NewNoCountThrottler() CountingThrottler { return noCountThrottler{} } -// NewNoThrottler returns a throttler that will never throttle -func NewNoThrottler() Throttler { return noThrottler{} } +type noCPUTracker struct{} + +func (noCPUTracker) UtilizeCPU(ids.ShortID, time.Duration) {} + +func (noCPUTracker) GetUtilization(ids.ShortID) float64 { return 0 } + +func (noCPUTracker) EndInterval() {} + +// NewNoCPUTracker returns a CPUTracker that does not track CPU usage and +// always returns 0 for the utilization value +func NewNoCPUTracker() CPUTracker { return noCPUTracker{} } diff --git a/snow/networking/throttler/throttler.go b/snow/networking/throttler/throttler.go index 3c0ef9679bab..39d4bcd4c0bb 100644 --- a/snow/networking/throttler/throttler.go +++ b/snow/networking/throttler/throttler.go @@ -9,6 +9,16 @@ import ( "github.com/ava-labs/gecko/ids" ) +const ( + // DefaultMaxNonStakerPendingMsgs rate limits the number of queued messages + // from non-stakers. + DefaultMaxNonStakerPendingMsgs uint32 = 3 + + // DefaultStakerPortion describes the percentage of resources that are + // reserved for stakers. + DefaultStakerPortion float64 = 0.2 +) + // Throttler provides an interface to register consumption // of resources and prioritize messages from nodes that have // used less CPU time. @@ -19,3 +29,19 @@ type Throttler interface { GetUtilization(ids.ShortID) (float64, bool) // Returns the CPU based priority and whether or not the peer has too many pending messages EndInterval() // Notify throttler that the current period has ended } + +// CPUTracker tracks the consumption of CPU time +type CPUTracker interface { + UtilizeCPU(ids.ShortID, time.Duration) + GetUtilization(ids.ShortID) float64 + EndInterval() +} + +// CountingThrottler tracks the usage of a discrete resource (ex. pending messages) by a peer +// and determines whether or not a peer should be throttled. +type CountingThrottler interface { + Add(ids.ShortID) + Remove(ids.ShortID) + Throttle(ids.ShortID) bool + EndInterval() +} diff --git a/snow/networking/throttler/throttler_test.go b/snow/networking/throttler/throttler_test.go index 3b16d64a14fd..9a77b4b6392e 100644 --- a/snow/networking/throttler/throttler_test.go +++ b/snow/networking/throttler/throttler_test.go @@ -12,50 +12,83 @@ import ( "github.com/ava-labs/gecko/utils/logging" ) -func TestEWMAThrottler(t *testing.T) { +func TestEWMATrackerPrioritizes(t *testing.T) { vdrs := validators.NewSet() vdr0 := ids.GenerateTestShortID() vdr1 := ids.GenerateTestShortID() + nonStaker := ids.GenerateTestShortID() vdrs.AddWeight(vdr0, 1) vdrs.AddWeight(vdr1, 1) - maxMessages := uint32(16) - msgPortion := 0.25 cpuPortion := 0.25 period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, msgPortion, cpuPortion, period, logging.NoLog{}) + throttler := NewEWMATracker(vdrs, cpuPortion, period, logging.NoLog{}) throttler.UtilizeCPU(vdr0, 25*time.Millisecond) throttler.UtilizeCPU(vdr1, 5*time.Second) - cpu0, throttle0 := throttler.GetUtilization(vdr0) - cpu1, throttle1 := throttler.GetUtilization(vdr1) - - if throttle0 { - t.Fatalf("Should not throttle vdr0 with no pending messages") - } - if throttle1 { - t.Fatalf("Should not throttle vdr1 with no pending messages") - } + cpu0 := throttler.GetUtilization(vdr0) + cpu1 := throttler.GetUtilization(vdr1) + cpuNonStaker := throttler.GetUtilization(nonStaker) if cpu1 <= cpu0 { t.Fatalf("CPU utilization for vdr1: %f should be greater than that of vdr0: %f", cpu1, cpu0) } - // Test that throttler prevents unknown validators from taking up half the message queue - for i := uint32(0); i < maxMessages; i++ { - throttler.AddMessage(ids.NewShortID([20]byte{byte(i)})) + if cpuNonStaker < cpu1 { + t.Fatalf("CPU Utilization for non-staker: %f should be greater than or equal to the CPU Utilization for the highest spending staker: %f", cpuNonStaker, cpu1) + } +} + +func TestEWMATrackerPrunesSpenders(t *testing.T) { + vdrs := validators.NewSet() + + staker0 := ids.GenerateTestShortID() + staker1 := ids.GenerateTestShortID() + nonStaker0 := ids.GenerateTestShortID() + nonStaker1 := ids.GenerateTestShortID() + + vdrs.AddWeight(staker0, 1) + vdrs.AddWeight(staker1, 1) + + cpuPortion := 0.25 + period := time.Second + throttler := NewEWMATracker(vdrs, cpuPortion, period, logging.NoLog{}) + + throttler.UtilizeCPU(staker0, 1.0) + throttler.UtilizeCPU(nonStaker0, 1.0) + + // 3 Cases: + // Stakers should not be pruned + // Non-stakers with non-zero cpuEWMA should not be pruned + // Non-stakers with cpuEWMA of 0 should be pruned + + // After 64 intervals nonStaker0 should be removed because its cpuEWMA statistic should reach 0 + // while nonStaker1 utilizes the CPU in every interval, so it should not be removed. + for i := 0; i < 64; i++ { + throttler.UtilizeCPU(nonStaker1, 1.0) + throttler.EndInterval() } - _, throttle := throttler.GetUtilization(ids.NewShortID([20]byte{'s', 'y', 'b', 'i', 'l'})) - if !throttle { - t.Fatal("Throttler should have started throttling messages from unknown peers") + // Ensure that the validators and the non-staker heard from every interval were not pruned + ewmat := throttler.(*ewmaCPUTracker) + if _, ok := ewmat.cpuSpenders[staker0.Key()]; !ok { + t.Fatal("Staker was pruned from the set of spenders") + } + if _, ok := ewmat.cpuSpenders[staker1.Key()]; !ok { + t.Fatal("Staker was pruned from the set of spenders") + } + if _, ok := ewmat.cpuSpenders[nonStaker0.Key()]; ok { + t.Fatal("Non-staker, not heard from in 64 periods, should have been pruned from the set of spenders") + } + if _, ok := ewmat.cpuSpenders[nonStaker1.Key()]; ok { + t.Fatal("Non-staker heard from in every period, was pruned from the set of spenders") } } -func TestThrottlerPrunesSpenders(t *testing.T) { +func TestMessageThrottlerPrunesSpenders(t *testing.T) { vdrs := validators.NewSet() staker0 := ids.GenerateTestShortID() @@ -68,83 +101,122 @@ func TestThrottlerPrunesSpenders(t *testing.T) { vdrs.AddWeight(staker1, 1) maxMessages := uint32(1024) - cpuPortion := 0.25 msgPortion := 0.25 - period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, msgPortion, cpuPortion, period, logging.NoLog{}) - throttler.AddMessage(nonStaker2) // nonStaker2 should not be removed with a pending message - throttler.UtilizeCPU(nonStaker0, 1.0) - throttler.UtilizeCPU(nonStaker1, 1.0) - intervalsUntilPruning := int(defaultIntervalsUntilPruning) - // Let two intervals pass with no activity to ensure that nonStaker1 can be pruned + throttler := NewMessageThrottler(vdrs, maxMessages, DefaultMaxNonStakerPendingMsgs, msgPortion, logging.NoLog{}) + + // 4 Cases: + // Stakers should not be pruned + // Non-stakers with pending messages should not be pruned + // Non-stakers heard from recently should not be pruned + // Non-stakers not heard from in [defaultIntervalsUntilPruning] should be pruned + + // Add pending messages for nonStaker1 and nonStaker2 + throttler.Add(nonStaker2) // Will not be removed, so it should not be pruned + throttler.Add(nonStaker1) + throttler.EndInterval() + throttler.Remove(nonStaker1) // The pending message was removed, so nonStaker1 should be pruned throttler.EndInterval() - throttler.UtilizeCPU(nonStaker0, 1.0) + intervalsUntilPruning := int(defaultIntervalsUntilPruning) // Let the required number of intervals elapse to allow nonStaker1 to be pruned for i := 0; i < intervalsUntilPruning; i++ { + throttler.Add(nonStaker0) // nonStaker0 is heard from in every interval, so it should not be pruned throttler.EndInterval() + throttler.Remove(nonStaker0) } - // Ensure that the validators and the non-staker heard from in the past [intervalsUntilPruning] were not pruned - ewmat := throttler.(*ewmaThrottler) - if _, ok := ewmat.spenders[staker0.Key()]; !ok { + msgThrottler := throttler.(*messageThrottler) + if _, ok := msgThrottler.msgSpenders[staker0.Key()]; !ok { t.Fatal("Staker was pruned from the set of spenders") } - if _, ok := ewmat.spenders[staker1.Key()]; !ok { + if _, ok := msgThrottler.msgSpenders[staker1.Key()]; !ok { t.Fatal("Staker was pruned from the set of spenders") } - if _, ok := ewmat.spenders[nonStaker0.Key()]; !ok { - t.Fatal("Non-staker heard from recently was pruned from the set of spenders") + if _, ok := msgThrottler.msgSpenders[nonStaker0.Key()]; !ok { + t.Fatal("Non-staker heard from within [intervalsUntilPruning] was removed from the set of spenders") } - if _, ok := ewmat.spenders[nonStaker1.Key()]; ok { - t.Fatal("Non-staker not heard from in a long time was not pruned from the set of spenders") + if _, ok := msgThrottler.msgSpenders[nonStaker1.Key()]; ok { + t.Fatal("Non-staker not heard from within [intervalsUntilPruning] was not removed from the set of spenders") } - if _, ok := ewmat.spenders[nonStaker2.Key()]; !ok { + if _, ok := msgThrottler.msgSpenders[nonStaker2.Key()]; !ok { t.Fatal("Non-staker with a pending message was pruned from the set of spenders") } } -func TestThrottleStaker(t *testing.T) { +func TestMessageThrottling(t *testing.T) { vdrs := validators.NewSet() staker0 := ids.GenerateTestShortID() staker1 := ids.GenerateTestShortID() nonStaker0 := ids.GenerateTestShortID() + nonStaker1 := ids.GenerateTestShortID() vdrs.AddWeight(staker0, 1) vdrs.AddWeight(staker1, 1) - maxMessages := uint32(16) + maxMessages := uint32(8) msgPortion := 0.25 - cpuPortion := 0.25 - period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, msgPortion, cpuPortion, period, logging.NoLog{}) + throttler := NewMessageThrottler(vdrs, maxMessages, DefaultMaxNonStakerPendingMsgs, msgPortion, logging.NoLog{}) - // Message Allotment: 0.5 * 0.25 * 15 = 2 - // Message Pool: 12 messages - // Validator should be throttled iff it has exceeded its message allotment and the shared - // message pool is empty + // Message Allotment: 0.5 * 0.25 * 8 = 1 + // Message Pool: 8 * 0.75 = 6 messages + // Max Messages: 1 + DefaultMaxNonStakerPendingMsgs + // Validator should be throttled if it has exceeded its max messages + // or it has exceeded its message allotment and the shared message pool is empty. - // staker0 consumes its own allotment plus 10 messages from the shared pool - for i := 0; i < 12; i++ { - throttler.AddMessage(staker0) - } + // staker0 consumes its entire message allotment - for i := 0; i < 3; i++ { - throttler.AddMessage(staker1) - if _, throttle := throttler.GetUtilization(staker1); throttle { + // Ensure that it is allowed to consume its entire max messages before being throttled + for i := 0; i < int(DefaultMaxNonStakerPendingMsgs)+1; i++ { + throttler.Add(staker0) + if throttler.Throttle(staker0) { t.Fatal("Should not throttle message from staker until it has exceeded its own allotment") } } - // Consume the last message and one extra message from the shared pool - throttler.AddMessage(nonStaker0) - throttler.AddMessage(nonStaker0) - throttler.AddMessage(nonStaker0) + // Ensure staker is throttled after exceeding its own max messages cap + throttler.Add(staker0) + if !throttler.Throttle(staker0) { + t.Fatal("Should have throttled message after exceeding message cap") + } + + // Remove messages to reduce staker0 to have its normal message allotment in pending + for i := 0; i < int(DefaultMaxNonStakerPendingMsgs)+1; i++ { + throttler.Remove(staker0) + } + + // Consume the entire message pool among two non-stakers + for i := 0; i < int(DefaultMaxNonStakerPendingMsgs); i++ { + throttler.Add(nonStaker0) + throttler.Add(nonStaker1) + + // Neither should be throttled because they are only consuming until their own messsage cap + // and the shared pool has been emptied. + if throttler.Throttle(nonStaker0) { + t.Fatalf("Should not have throttled message from nonStaker0 after %d messages", i) + } + if throttler.Throttle(nonStaker1) { + t.Fatalf("Should not have throttled message from nonStaker1 after %d messages", i) + } + } + + // An additional message from staker0 should now cause it to be throttled since the mesasage pool + // has been emptied. + if throttler.Throttle(staker0) { + t.Fatal("Should not have throttled message from staker until it had exceeded its message allotment.") + } + throttler.Add(staker0) + if !throttler.Throttle(staker0) { + t.Fatal("Should have throttled message from staker0 after it exceeded its message allotment because the message pool was empty.") + } - if _, throttle := throttler.GetUtilization(staker1); !throttle { - t.Fatal("Should have throttled message from staker after it exceeded its own allotment and the shared pool was empty") + if !throttler.Throttle(nonStaker0) { + t.Fatal("Should have throttled message from nonStaker0 after the message pool was emptied") + } + + if !throttler.Throttle(nonStaker1) { + t.Fatal("Should have throttled message from nonStaker1 after the message pool was emptied") } } @@ -157,11 +229,9 @@ func TestCalculatesEWMA(t *testing.T) { vdrs.AddWeight(vdr0, 1) vdrs.AddWeight(vdr1, 1) - maxMessages := uint32(16) - msgPortion := 0.25 stakerPortion := 0.25 period := time.Second - throttler := NewEWMAThrottler(vdrs, maxMessages, msgPortion, stakerPortion, period, logging.NoLog{}) + throttler := NewEWMATracker(vdrs, stakerPortion, period, logging.NoLog{}) // Spend X CPU time in consecutive intervals and ensure that the throttler correctly calculates EWMA spends := []time.Duration{ @@ -182,7 +252,7 @@ func TestCalculatesEWMA(t *testing.T) { throttler.EndInterval() } - ewmat := throttler.(*ewmaThrottler) + ewmat := throttler.(*ewmaCPUTracker) sp := ewmat.getSpender(vdr0) if sp.cpuEWMA != ewma { t.Fatalf("EWMA Throttler calculated EWMA incorrectly, expected: %s, but calculated: %s", ewma, sp.cpuEWMA) diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 2e63892455d3..c29b12796ad9 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -28,6 +28,7 @@ import ( "github.com/ava-labs/gecko/snow/engine/snowman/bootstrap" "github.com/ava-labs/gecko/snow/networking/router" "github.com/ava-labs/gecko/snow/networking/sender" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/constants" @@ -1584,8 +1585,9 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { vdrs, msgChan, 1000, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) diff --git a/vms/spchainvm/consensus_benchmark_test.go b/vms/spchainvm/consensus_benchmark_test.go index 717fa5ccb1fe..1f73ed5fd105 100644 --- a/vms/spchainvm/consensus_benchmark_test.go +++ b/vms/spchainvm/consensus_benchmark_test.go @@ -18,6 +18,7 @@ import ( "github.com/ava-labs/gecko/snow/engine/snowman/bootstrap" "github.com/ava-labs/gecko/snow/networking/router" "github.com/ava-labs/gecko/snow/networking/sender" + "github.com/ava-labs/gecko/snow/networking/throttler" "github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/utils/logging" @@ -111,8 +112,9 @@ func ConsensusLeader(numBlocks, numTxsPerBlock int, b *testing.B) { vdrs, msgChan, 1000, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), ) @@ -253,8 +255,9 @@ func ConsensusFollower(numBlocks, numTxsPerBlock int, b *testing.B) { vdrs, msgChan, 1000, - router.DefaultStakerPortion, - router.DefaultStakerPortion, + throttler.DefaultMaxNonStakerPendingMsgs, + throttler.DefaultStakerPortion, + throttler.DefaultStakerPortion, "", prometheus.NewRegistry(), )