Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type manager struct {

stakingEnabled bool // True iff the network has staking enabled
stakerMsgPortion, stakerCPUPortion float64
maxNonStakerPendingMsgs uint32
log logging.Logger
logFactory logging.Factory
vmManager vms.Manager // Manage mappings from vm ID --> vm
Expand Down Expand Up @@ -152,6 +153,7 @@ type manager struct {
// TODO: Make this function take less arguments
func New(
stakingEnabled bool,
maxNonStakerPendingMsgs uint,
stakerMsgPortion,
stakerCPUPortion float64,
log logging.Logger,
Expand Down Expand Up @@ -186,29 +188,30 @@ func New(
rtr.Initialize(log, &timeoutManager, gossipFrequency, shutdownTimeout)

m := &manager{
stakingEnabled: stakingEnabled,
stakerMsgPortion: stakerMsgPortion,
stakerCPUPortion: stakerCPUPortion,
log: log,
logFactory: logFactory,
vmManager: vmManager,
decisionEvents: decisionEvents,
consensusEvents: consensusEvents,
db: db,
chainRouter: rtr,
net: net,
timeoutManager: &timeoutManager,
consensusParams: consensusParams,
validators: validators,
nodeID: nodeID,
networkID: networkID,
server: server,
keystore: keystore,
atomicMemory: atomicMemory,
avaxAssetID: avaxAssetID,
xChainID: xChainID,
criticalChains: criticalChains,
chains: make(map[[32]byte]*router.Handler),
stakingEnabled: stakingEnabled,
maxNonStakerPendingMsgs: uint32(maxNonStakerPendingMsgs),
stakerMsgPortion: stakerMsgPortion,
stakerCPUPortion: stakerCPUPortion,
log: log,
logFactory: logFactory,
vmManager: vmManager,
decisionEvents: decisionEvents,
consensusEvents: consensusEvents,
db: db,
chainRouter: rtr,
net: net,
timeoutManager: &timeoutManager,
consensusParams: consensusParams,
validators: validators,
nodeID: nodeID,
networkID: networkID,
server: server,
keystore: keystore,
atomicMemory: atomicMemory,
avaxAssetID: avaxAssetID,
xChainID: xChainID,
criticalChains: criticalChains,
chains: make(map[[32]byte]*router.Handler),
}
m.Initialize()
return m, nil
Expand Down Expand Up @@ -511,6 +514,7 @@ func (m *manager) createAvalancheChain(
validators,
msgChan,
defaultChannelSize,
m.maxNonStakerPendingMsgs,
m.stakerMsgPortion,
m.stakerCPUPortion,
fmt.Sprintf("%s_handler", consensusParams.Namespace),
Expand Down Expand Up @@ -589,6 +593,7 @@ func (m *manager) createSnowmanChain(
validators,
msgChan,
defaultChannelSize,
m.maxNonStakerPendingMsgs,
m.stakerMsgPortion,
m.stakerCPUPortion,
fmt.Sprintf("%s_handler", consensusParams.Namespace),
Expand Down
1 change: 1 addition & 0 deletions main/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func init() {
fs.Uint64Var(&Config.DisabledStakingWeight, "staking-disabled-weight", 1, "Weight to provide to each peer when staking is disabled")

// Throttling:
fs.UintVar(&Config.MaxNonStakerPendingMsgs, "max-non-staker-pending-msgs", 3, "Maximum number of messages a non-staker is allowed to have pending.")
fs.Float64Var(&Config.StakerMsgPortion, "staker-msg-reserved", 0.2, "Reserve a portion of the chain message queue's space for stakers.")
fs.Float64Var(&Config.StakerCPUPortion, "staker-cpu-reserved", 0.2, "Reserve a portion of the chain's CPU time for stakers.")

Expand Down
19 changes: 10 additions & 9 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ type Config struct {
DB database.Database

// Staking configuration
StakingIP utils.IPDesc
StakingLocalPort uint16
EnableP2PTLS bool
EnableStaking bool
StakingKeyFile string
StakingCertFile string
DisabledStakingWeight uint64
StakerMsgPortion float64
StakerCPUPortion float64
StakingIP utils.IPDesc
StakingLocalPort uint16
EnableP2PTLS bool
EnableStaking bool
StakingKeyFile string
StakingCertFile string
DisabledStakingWeight uint64
MaxNonStakerPendingMsgs uint
StakerMsgPortion float64
StakerCPUPortion float64

// Bootstrapping configuration
BootstrapPeers []*Peer
Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error {

n.chainManager, err = chains.New(
n.Config.EnableStaking,
n.Config.MaxNonStakerPendingMsgs,
n.Config.StakerMsgPortion,
n.Config.StakerCPUPortion,
n.Log,
Expand Down
11 changes: 7 additions & 4 deletions snow/networking/router/chain_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/networking/throttler"
"github.com/ava-labs/gecko/snow/networking/timeout"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/logging"
Expand Down Expand Up @@ -39,8 +40,9 @@ func TestShutdown(t *testing.T) {
validators.NewSet(),
nil,
1,
DefaultStakerPortion,
DefaultStakerPortion,
throttler.DefaultMaxNonStakerPendingMsgs,
throttler.DefaultStakerPortion,
throttler.DefaultStakerPortion,
"",
prometheus.NewRegistry(),
)
Expand Down Expand Up @@ -97,8 +99,9 @@ func TestShutdownTimesOut(t *testing.T) {
validators.NewSet(),
nil,
1,
DefaultStakerPortion,
DefaultStakerPortion,
throttler.DefaultMaxNonStakerPendingMsgs,
throttler.DefaultStakerPortion,
throttler.DefaultStakerPortion,
"",
prometheus.NewRegistry(),
)
Expand Down
8 changes: 2 additions & 6 deletions snow/networking/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ import (
"github.com/ava-labs/gecko/utils/timer"
)

const (
// DefaultStakerPortion defines the default percentage of resources to
// allocate to stakers.
DefaultStakerPortion float64 = 0.2
)

// Requirement: A set of nodes spamming messages (potentially costly) shouldn't
// impact other node's queries.

Expand Down Expand Up @@ -120,6 +114,7 @@ func (h *Handler) Initialize(
validators validators.Set,
msgChan <-chan common.Message,
bufferSize int,
maxNonStakerPendingMsgs uint32,
stakerMsgPortion,
stakerCPUPortion float64,
namespace string,
Expand Down Expand Up @@ -159,6 +154,7 @@ func (h *Handler) Initialize(
consumptionRanges,
consumptionAllotments,
bufferSize,
maxNonStakerPendingMsgs,
cpuInterval,
stakerMsgPortion,
stakerCPUPortion,
Expand Down
16 changes: 10 additions & 6 deletions snow/networking/router/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/networking/throttler"
)

func TestHandlerDropsTimedOutMessages(t *testing.T) {
Expand Down Expand Up @@ -41,8 +42,9 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) {
vdrs,
nil,
16,
DefaultStakerPortion,
DefaultStakerPortion,
throttler.DefaultMaxNonStakerPendingMsgs,
throttler.DefaultStakerPortion,
throttler.DefaultStakerPortion,
"",
prometheus.NewRegistry(),
)
Expand Down Expand Up @@ -83,8 +85,9 @@ func TestHandlerDoesntDrop(t *testing.T) {
validators,
nil,
16,
DefaultStakerPortion,
DefaultStakerPortion,
throttler.DefaultMaxNonStakerPendingMsgs,
throttler.DefaultStakerPortion,
throttler.DefaultStakerPortion,
"",
prometheus.NewRegistry(),
)
Expand Down Expand Up @@ -118,8 +121,9 @@ func TestHandlerClosesOnError(t *testing.T) {
validators.NewSet(),
nil,
16,
DefaultStakerPortion,
DefaultStakerPortion,
throttler.DefaultMaxNonStakerPendingMsgs,
throttler.DefaultStakerPortion,
throttler.DefaultStakerPortion,
"",
prometheus.NewRegistry(),
)
Expand Down
27 changes: 16 additions & 11 deletions snow/networking/router/service_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type messageQueue interface {
type multiLevelQueue struct {
lock sync.Mutex

validators validators.Set
throttler throttler.Throttler
validators validators.Set
cpuTracker throttler.CPUTracker
msgThrottler throttler.CountingThrottler

// Tracks total CPU consumption
intervalConsumption, tierConsumption, cpuInterval time.Duration
Expand Down Expand Up @@ -60,13 +61,15 @@ func newMultiLevelQueue(
consumptionRanges []float64,
consumptionAllotments []time.Duration,
bufferSize int,
maxNonStakerPendingMsgs uint32,
cpuInterval time.Duration,
msgPortion,
cpuPortion float64,
) (messageQueue, chan struct{}) {
semaChan := make(chan struct{}, bufferSize)
singleLevelSize := bufferSize / len(consumptionRanges)
throttler := throttler.NewEWMAThrottler(vdrs, uint32(bufferSize), msgPortion, cpuPortion, cpuInterval, log)
cpuTracker := throttler.NewEWMATracker(vdrs, cpuPortion, cpuInterval, log)
msgThrottler := throttler.NewMessageThrottler(vdrs, uint32(bufferSize), maxNonStakerPendingMsgs, msgPortion, log)
queues := make([]singleLevelQueue, len(consumptionRanges))
for index := 0; index < len(queues); index++ {
gauge, histogram, err := metrics.registerTierStatistics(index)
Expand All @@ -84,7 +87,8 @@ func newMultiLevelQueue(

return &multiLevelQueue{
validators: vdrs,
throttler: throttler,
cpuTracker: cpuTracker,
msgThrottler: msgThrottler,
queues: queues,
cpuRanges: consumptionRanges,
cpuAllotments: consumptionAllotments,
Expand Down Expand Up @@ -116,7 +120,7 @@ func (ml *multiLevelQueue) PushMessage(msg message) bool {
return false
}
ml.pendingMessages++
ml.throttler.AddMessage(msg.validatorID)
ml.msgThrottler.Add(msg.validatorID)
select {
case ml.semaChan <- struct{}{}:
default:
Expand All @@ -134,7 +138,7 @@ func (ml *multiLevelQueue) PopMessage() (message, error) {
msg, err := ml.popMessage()
if err == nil {
ml.pendingMessages--
ml.throttler.RemoveMessage(msg.validatorID)
ml.msgThrottler.Remove(msg.validatorID)
ml.metrics.pending.Dec()
}
return msg, err
Expand All @@ -145,7 +149,7 @@ func (ml *multiLevelQueue) UtilizeCPU(vdr ids.ShortID, duration time.Duration) {
ml.lock.Lock()
defer ml.lock.Unlock()

ml.throttler.UtilizeCPU(vdr, duration)
ml.cpuTracker.UtilizeCPU(vdr, duration)
ml.intervalConsumption += duration
ml.tierConsumption += duration
if ml.tierConsumption > ml.cpuAllotments[ml.currentTier] {
Expand All @@ -160,7 +164,8 @@ func (ml *multiLevelQueue) EndInterval() {
ml.lock.Lock()
defer ml.lock.Unlock()

ml.throttler.EndInterval()
ml.cpuTracker.EndInterval()
ml.msgThrottler.EndInterval()
ml.metrics.cpu.Observe(float64(ml.intervalConsumption.Milliseconds()))
ml.intervalConsumption = 0
}
Expand Down Expand Up @@ -189,7 +194,7 @@ func (ml *multiLevelQueue) popMessage() (message, error) {
ml.queues[ml.currentTier].waitingTime.Observe(float64(time.Since(msg.received)))

// Check where messages from this validator currently belong
cpu, _ := ml.throttler.GetUtilization(msg.validatorID)
cpu := ml.cpuTracker.GetUtilization(msg.validatorID)
correctIndex := ml.getPriorityIndex(cpu)

// If the message is at least the priority of the current tier
Expand Down Expand Up @@ -227,12 +232,12 @@ func (ml *multiLevelQueue) pushMessage(msg message) bool {
ml.log.Warn("Dropping message due to invalid validatorID")
return false
}
cpu, throttle := ml.throttler.GetUtilization(validatorID)
throttle := ml.msgThrottler.Throttle(validatorID)
if throttle {
ml.metrics.throttled.Inc()
return false
}

cpu := ml.cpuTracker.GetUtilization(validatorID)
queueIndex := ml.getPriorityIndex(cpu)

return ml.waterfallMessage(msg, queueIndex)
Expand Down
16 changes: 10 additions & 6 deletions snow/networking/router/service_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/gecko/snow/networking/throttler"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/logging"
)
Expand Down Expand Up @@ -42,9 +43,10 @@ func setupMultiLevelQueue(t *testing.T, bufferSize int) (messageQueue, chan stru
consumptionRanges,
consumptionAllotments,
bufferSize,
throttler.DefaultMaxNonStakerPendingMsgs,
time.Second,
DefaultStakerPortion,
DefaultStakerPortion,
throttler.DefaultStakerPortion,
throttler.DefaultStakerPortion,
)

return queue, semaChan, vdrs
Expand Down Expand Up @@ -169,9 +171,10 @@ func TestMultiLevelQueuePrioritizes(t *testing.T) {
consumptionRanges,
consumptionAllotments,
bufferSize,
throttler.DefaultMaxNonStakerPendingMsgs,
time.Second,
DefaultStakerPortion,
DefaultStakerPortion,
throttler.DefaultStakerPortion,
throttler.DefaultStakerPortion,
)

// Utilize CPU such that the next message from validator2 will be placed on a lower
Expand Down Expand Up @@ -263,9 +266,10 @@ func TestMultiLevelQueuePushesDownOldMessages(t *testing.T) {
consumptionRanges,
consumptionAllotments,
bufferSize,
throttler.DefaultMaxNonStakerPendingMsgs,
time.Second,
DefaultStakerPortion,
DefaultStakerPortion,
throttler.DefaultStakerPortion,
throttler.DefaultStakerPortion,
)

queue.PushMessage(message{
Expand Down
Loading