Skip to content

Commit

Permalink
feat: garbage collector for unused rate limiters
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed May 15, 2024
1 parent 8a9df3f commit 890302b
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 16 deletions.
4 changes: 2 additions & 2 deletions internal/mempool/p2p_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type (
}
)

func consumerHandler(logger log.Logger, config *config.MempoolConfig, checker TxChecker, ids *IDs) client.ConsumerParams {
func consumerHandler(ctx context.Context, logger log.Logger, config *config.MempoolConfig, checker TxChecker, ids *IDs) client.ConsumerParams {
chanIDs := []p2p.ChannelID{p2p.MempoolChannel}

nTokensFunc := func(e *p2p.Envelope) uint {
Expand All @@ -46,7 +46,7 @@ func consumerHandler(logger log.Logger, config *config.MempoolConfig, checker Tx
checker: checker,
ids: ids,
},
client.WithRecvRateLimitPerPeerHandler(config.TxRecvRateLimit, nTokensFunc, true, logger),
client.WithRecvRateLimitPerPeerHandler(ctx, config.TxRecvRateLimit, nTokensFunc, true, logger),
client.WithValidateMessageHandler(chanIDs),
client.WithErrorLoggerMiddleware(logger),
client.WithRecoveryMiddleware(logger),
Expand Down
2 changes: 1 addition & 1 deletion internal/mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Info("tx broadcasting is disabled")
}
go func() {
err := r.p2pClient.Consume(ctx, consumerHandler(r.logger, r.mempool.config, r.mempool, r.ids))
err := r.p2pClient.Consume(ctx, consumerHandler(ctx, r.logger, r.mempool.config, r.mempool, r.ids))
if err != nil {
r.logger.Error("failed to consume p2p checker messages", "error", err)
}
Expand Down
7 changes: 5 additions & 2 deletions internal/p2p/client/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type (
next ConsumerHandler
}

// TokenNumberFunc is a function that returns number of tokens to consume for a given envelope
TokenNumberFunc func(*p2p.Envelope) uint

recvRateLimitPerPeerHandler struct {
RateLimit

Expand Down Expand Up @@ -88,10 +91,10 @@ func WithValidateMessageHandler(allowedChannelIDs []p2p.ChannelID) ConsumerMiddl
}
}

func WithRecvRateLimitPerPeerHandler(limit float64, nTokensFunc TokenNumberFunc, drop bool, logger log.Logger) ConsumerMiddlewareFunc {
func WithRecvRateLimitPerPeerHandler(ctx context.Context, limit float64, nTokensFunc TokenNumberFunc, drop bool, logger log.Logger) ConsumerMiddlewareFunc {
return func(next ConsumerHandler) ConsumerHandler {
hd := &recvRateLimitPerPeerHandler{
RateLimit: *NewRateLimit(limit, drop, logger),
RateLimit: *NewRateLimit(ctx, limit, drop, logger),
nTokensFunc: nTokensFunc,
}

Expand Down
2 changes: 1 addition & 1 deletion internal/p2p/client/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestRateLimitHandler(t *testing.T) {
logger := log.NewTestingLogger(t)
client := &Client{}

mw := WithRecvRateLimitPerPeerHandler(RateLimit, func(*p2p.Envelope) uint { return 1 }, false, logger)(fakeHandler).(*recvRateLimitPerPeerHandler)
mw := WithRecvRateLimitPerPeerHandler(ctx, RateLimit, func(*p2p.Envelope) uint { return 1 }, false, logger)(fakeHandler).(*recvRateLimitPerPeerHandler)
mw.burst = Burst

start := sync.RWMutex{}
Expand Down
60 changes: 50 additions & 10 deletions internal/p2p/client/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"golang.org/x/time/rate"

"github.com/dashpay/tenderdash/internal/p2p"
"github.com/dashpay/tenderdash/libs/log"
"github.com/dashpay/tenderdash/types"
)

type TokenNumberFunc func(*p2p.Envelope) uint
const PeerRateLimitLifetime = 60 // number of seconds to keep the rate limiter for a peer

// RateLimit is a rate limiter for p2p messages.
// It is used to limit the rate of incoming messages from a peer.
Expand All @@ -34,35 +34,49 @@ type RateLimit struct {
logger log.Logger
}

type limiter struct {
*rate.Limiter
// lastAccess is the last time the limiter was accessed, as Unix time (seconds)
lastAccess atomic.Int64
}

// NewRateLimit creates a new rate limiter.
//
// # Arguments
//
// * `ctx` - context; used to gracefully shutdown the garbage collection routine
// * `limit` - rate limit per peer per second; 0 means no limit
// * `drop` - silently drop the message if the rate limit is exceeded; otherwise we will wait until the message is allowed
// * `logger` - logger
func NewRateLimit(limit float64, drop bool, logger log.Logger) *RateLimit {
return &RateLimit{
func NewRateLimit(ctx context.Context, limit float64, drop bool, logger log.Logger) *RateLimit {
h := &RateLimit{
limiters: sync.Map{},
limit: limit,
burst: int(DefaultRecvBurstMultiplier * limit),
drop: drop,
logger: logger,
}

// start the garbage collection routine
go h.gcRoutine(ctx)

return h
}

func (h *RateLimit) getLimiter(peerID types.NodeID) *rate.Limiter {
var limiter *rate.Limiter
func (h *RateLimit) getLimiter(peerID types.NodeID) *limiter {
var limit *limiter
if l, ok := h.limiters.Load(peerID); ok {
limiter = l.(*rate.Limiter)
limit = l.(*limiter)
} else {
limiter = rate.NewLimiter(rate.Limit(h.limit), h.burst)
limit = &limiter{Limiter: rate.NewLimiter(rate.Limit(h.limit), h.burst)}
// we have a slight race condition here, possibly overwriting the limiter, but it's not a big deal
// as the worst case scenario is that we allow one or two more messages than we should
h.limiters.Store(peerID, limiter)
h.limiters.Store(peerID, limit)
}

return limiter
limit.lastAccess.Store(time.Now().Unix())

return limit
}

// Limit waits for the rate limit to allow the message to be sent.
Expand Down Expand Up @@ -90,3 +104,29 @@ func (h *RateLimit) Limit(ctx context.Context, peerID types.NodeID, nTokens int)
}
return true, nil
}

// gcRoutine is a goroutine that removes unused limiters for peers every `PeerRateLimitLifetime` seconds.
func (h *RateLimit) gcRoutine(ctx context.Context) {
ticker := time.NewTicker(PeerRateLimitLifetime * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
h.gc()
}
}
}

// GC removes old limiters.
func (h *RateLimit) gc() {
now := time.Now().Unix()
h.limiters.Range(func(key, value interface{}) bool {
if value.(*limiter).lastAccess.Load() < now-60 {
h.limiters.Delete(key)
}
return true
})
}

0 comments on commit 890302b

Please sign in to comment.