Skip to content

Commit

Permalink
fix: exporting method instead of exporting field on middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed May 3, 2023
1 parent 7ef47a3 commit 221bf8a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 15 deletions.
15 changes: 10 additions & 5 deletions share/p2p/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
var log = logging.Logger("shrex/middleware")

type Middleware struct {
// NumRateLimited is the number of requests that were rate limited. It is reset to 0 every time
// it is read and observed into metrics.
NumRateLimited atomic.Int64

// concurrencyLimit is the maximum number of requests that can be processed at once.
concurrencyLimit int64
// parallelRequests is the number of requests currently being processed.
parallelRequests atomic.Int64
// numRateLimited is the number of requests that were rate limited.
numRateLimited atomic.Int64
}

func NewMiddleware(concurrencyLimit int) *Middleware {
Expand All @@ -24,13 +24,18 @@ func NewMiddleware(concurrencyLimit int) *Middleware {
}
}

// DrainCounter returns the current value of the rate limit counter and resets it to 0.
func (m *Middleware) DrainCounter() int64 {
return m.numRateLimited.Swap(0)
}

func (m *Middleware) RateLimitHandler(handler network.StreamHandler) network.StreamHandler {
return func(stream network.Stream) {
current := m.parallelRequests.Add(1)
defer m.parallelRequests.Add(-1)

if current > m.concurrencyLimit {
m.NumRateLimited.Add(1)
m.numRateLimited.Add(1)
log.Debug("concurrency limit reached")
err := stream.Close()
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions share/p2p/shrexeds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ func (s *Server) Stop(context.Context) error {
}

func (s *Server) observeRateLimitedRequests(ctx context.Context) {
var numRateLimited int64
if s.metrics != nil {
numRateLimited = s.middleware.NumRateLimited.Swap(0)
}

numRateLimited := s.middleware.DrainCounter()
if numRateLimited > 0 {
s.metrics.ObserveRequests(ctx, numRateLimited, p2p.StatusRateLimited)
}
Expand Down
6 changes: 1 addition & 5 deletions share/p2p/shrexnd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ func (srv *Server) Stop(context.Context) error {
}

func (srv *Server) observeRateLimitedRequests(ctx context.Context) {
var numRateLimited int64
if srv.metrics != nil {
numRateLimited = srv.middleware.NumRateLimited.Swap(0)
}

numRateLimited := srv.middleware.DrainCounter()
if numRateLimited > 0 {
srv.metrics.ObserveRequests(ctx, numRateLimited, p2p.StatusRateLimited)
}
Expand Down

0 comments on commit 221bf8a

Please sign in to comment.