Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txHandler: applications rate limiter #5734

Merged
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
698221f
txHandler: sliding windows rate limiter
algorandskiy Sep 11, 2023
b780eb2
Add remote IP addr to app idx filtering
algorandskiy Sep 11, 2023
4227408
More tests + linter fixes
algorandskiy Sep 12, 2023
d912853
remove some code duplication
algorandskiy Sep 12, 2023
a142581
Shards per app
algorandskiy Sep 12, 2023
ad931f6
make interval atomic
algorandskiy Sep 12, 2023
21722be
move rate limiter configuration into local config + test
algorandskiy Sep 13, 2023
43ea42e
Fix max size checks for buckets
algorandskiy Sep 14, 2023
39ec627
Add salt to prevent censoring with reduced key size to 8 bytes
algorandskiy Sep 14, 2023
785a229
go benchmarks
algorandskiy Sep 14, 2023
e05d839
Implement LRU eviction
algorandskiy Sep 15, 2023
17127ac
Fix cache size less than number of buckets
algorandskiy Sep 18, 2023
c7e7d0b
Add app limter drop counter
algorandskiy Sep 20, 2023
c30298b
CR fixes
algorandskiy Sep 20, 2023
f5fabd5
Use admission rate instead attempted rate
algorandskiy Sep 20, 2023
7b7d4f6
Revert "Use admission rate instead attempted rate"
algorandskiy Sep 20, 2023
8902be4
Reimplement cache admission
algorandskiy Sep 20, 2023
e5c98d5
CR fixes: config rename, buckets restructure
algorandskiy Sep 22, 2023
70a6ba4
CR: use part of ipv6 address
algorandskiy Sep 22, 2023
a53c07a
start app rate limiting only of congested
algorandskiy Sep 22, 2023
612156a
use rawmsg.Received timestamp for testability and cut some ns of math
algorandskiy Sep 25, 2023
61e0264
use sync.Pool for keys and buckets
algorandskiy Sep 26, 2023
1a00cf4
set TxBacklogAppTxPerSecondRate=100
algorandskiy Sep 26, 2023
3459298
CR fixes
algorandskiy Sep 27, 2023
1e9b73d
perf: upgrade go-deadlock
algorandskiy Sep 27, 2023
d83565b
CR: do not use rawmsg.Received for limiting
algorandskiy Sep 29, 2023
ab25e9b
Merge remote-tracking branch 'upstream/master' into pavel/txhandler-a…
algorandskiy Oct 31, 2023
7d32f6e
config: migrate to v32
algorandskiy Oct 31, 2023
a66abbf
CR: enable app limiter separately with EnableAppTxBacklogRateLimiting
algorandskiy Nov 3, 2023
4ede214
wip: txgroupToKeysDups
algorandskiy Nov 7, 2023
5fc75b6
CR: dedup appids and zeros
algorandskiy Nov 8, 2023
8c4c28b
marginally optimize txnToDigest a bit
algorandskiy Nov 8, 2023
eb977e2
CR: config renaming/comment fix
algorandskiy Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,9 @@ var StateProofTopVoters int
// in a block must not exceed MaxTxnBytesPerBlock.
var MaxTxnBytesPerBlock int

// MaxAppTxnForeignApps is the max number of foreign apps per txn across all consensus versions
var MaxAppTxnForeignApps int

func checkSetMax(value int, curMax *int) {
if value > *curMax {
*curMax = value
Expand Down Expand Up @@ -681,6 +684,8 @@ func checkSetAllocBounds(p ConsensusParams) {
checkSetMax(p.MaxAppKeyLen, &MaxAppBytesKeyLen)
checkSetMax(int(p.StateProofTopVoters), &StateProofTopVoters)
checkSetMax(p.MaxTxnBytesPerBlock, &MaxTxnBytesPerBlock)

checkSetMax(p.MaxAppTxnForeignApps, &MaxAppTxnForeignApps)
}

// SaveConfigurableConsensus saves the configurable protocols file to the provided data directory.
Expand Down
14 changes: 12 additions & 2 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Local struct {
// Version tracks the current version of the defaults so we can migrate old -> new
// This is specifically important whenever we decide to change the default value
// for an existing parameter. This field tag must be updated any time we add a new version.
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23" version[24]:"24" version[25]:"25" version[26]:"26" version[27]:"27" version[28]:"28" version[29]:"29" version[30]:"30" version[31]:"31"`
Version uint32 `version[0]:"0" version[1]:"1" version[2]:"2" version[3]:"3" version[4]:"4" version[5]:"5" version[6]:"6" version[7]:"7" version[8]:"8" version[9]:"9" version[10]:"10" version[11]:"11" version[12]:"12" version[13]:"13" version[14]:"14" version[15]:"15" version[16]:"16" version[17]:"17" version[18]:"18" version[19]:"19" version[20]:"20" version[21]:"21" version[22]:"22" version[23]:"23" version[24]:"24" version[25]:"25" version[26]:"26" version[27]:"27" version[28]:"28" version[29]:"29" version[30]:"30" version[31]:"31" version[32]:"32"`

// Archival nodes retain a full copy of the block history. Non-Archival nodes will delete old blocks and only retain what's need to properly validate blockchain messages (the precise number of recent blocks depends on the consensus parameters. Currently the last 1321 blocks are required). This means that non-Archival nodes require significantly less storage than Archival nodes. Relays (nodes with a valid NetAddress) are always Archival, regardless of this setting. This may change in the future. If setting this to true for the first time, the existing ledger may need to be deleted to get the historical values stored as the setting only effects current blocks forward. To do this, shutdown the node and delete all .sqlite files within the data/testnet-version directory, except the crash.sqlite file. Restart the node and wait for the node to sync.
Archival bool `version[0]:"false"`
Expand Down Expand Up @@ -231,7 +231,17 @@ type Local struct {
// TxBacklogReservedCapacityPerPeer determines how much dedicated serving capacity the TxBacklog gives each peer
TxBacklogReservedCapacityPerPeer int `version[27]:"20"`

// EnableTxBacklogRateLimiting controls if a rate limiter and congestion manager shouild be attached to the tx backlog enqueue process
// TxBacklogAppTxRateLimiterMaxSize denotes a max size for the tx rate limiter
// calculated as "a thousand apps on a network of thousand of peers"
TxBacklogAppTxRateLimiterMaxSize int `version[32]:"1048576"`
gmalouf marked this conversation as resolved.
Show resolved Hide resolved

// TxBacklogAppTxPerSecondRate determines a target app per second rate for the app tx rate limiter
TxBacklogAppTxPerSecondRate int `version[32]:"100"`

// TxBacklogRateLimitingCongestionRatio determines the backlog filling threashold in percents at which app limiter kicks in
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
TxBacklogRateLimitingCongestionPct int `version[32]:"50"`

// EnableTxBacklogRateLimiting controls if a rate limiter and congestion manager should be attached to the tx backlog enqueue process
// if enabled, the over-all TXBacklog Size will be larger by MAX_PEERS*TxBacklogReservedCapacityPerPeer
EnableTxBacklogRateLimiting bool `version[27]:"false" version[30]:"true"`

Expand Down
5 changes: 4 additions & 1 deletion config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package config

var defaultLocal = Local{
Version: 31,
Version: 32,
AccountUpdatesStatsInterval: 5000000000,
AccountsRebuildSynchronousMode: 1,
AgreementIncomingBundlesQueueLength: 15,
Expand Down Expand Up @@ -141,6 +141,9 @@ var defaultLocal = Local{
TrackerDBDir: "",
TransactionSyncDataExchangeRate: 0,
TransactionSyncSignificantMessageThreshold: 0,
TxBacklogAppTxPerSecondRate: 100,
TxBacklogAppTxRateLimiterMaxSize: 1048576,
TxBacklogRateLimitingCongestionPct: 50,
TxBacklogReservedCapacityPerPeer: 20,
TxBacklogServiceRateWindowSeconds: 10,
TxBacklogSize: 26000,
Expand Down
305 changes: 305 additions & 0 deletions data/appRateLimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package data

import (
"encoding/binary"
"sync"
"sync/atomic"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-deadlock"
"golang.org/x/crypto/blake2b"
)

const numBuckets = 128

type keyType [8]byte

// appRateLimiter implements a sliding window counter rate limiter for applications.
// It is a sharded map with numBuckets of maps each protected by its own mutex.
// Bucket is selected by hashing the application index with a seed (see memhash64).
// LRU is used to evict entries from each bucket, and "last use" is updated on each attempt, not admission.
// This is mostly done to simplify the implementation and does not look affecting the correctness.
type appRateLimiter struct {
maxBucketSize int
serviceRatePerWindow uint64
serviceRateWindow time.Duration

// seed for hashing application index to bucket
seed uint64
// salt for hashing application index + origin address
salt [16]byte

buckets [numBuckets]appRateLimiterBucket

// evictions
// TODO: delete?
evictions uint64
evictionTime uint64
}

type appRateLimiterBucket struct {
entries map[keyType]*appRateLimiterEntry
lru *util.List[keyType]
mu deadlock.RWMutex // mutex protects both map and the list access
}

type appRateLimiterEntry struct {
prev atomic.Int64
cur atomic.Int64
interval int64 // numeric representation of the current interval value
lruElement *util.ListNode[keyType]
}

// makeAppRateLimiter creates a new appRateLimiter from the parameters:
// maxCacheSize is the maximum number of entries to keep in the cache to keep it memory bounded
// maxAppPeerRate is the maximum number of admitted apps per peer per second
// serviceRateWindow is the service window
func makeAppRateLimiter(maxCacheSize int, maxAppPeerRate uint64, serviceRateWindow time.Duration) *appRateLimiter {
// convert target per app rate to per window service rate
serviceRatePerWindow := maxAppPeerRate * uint64(serviceRateWindow/time.Second)
maxBucketSize := maxCacheSize / numBuckets
if maxBucketSize == 0 {
// got the max size less then buckets, use maps of 1
maxBucketSize = 1
}
r := &appRateLimiter{
maxBucketSize: maxBucketSize,
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
serviceRatePerWindow: serviceRatePerWindow,
serviceRateWindow: serviceRateWindow,
seed: crypto.RandUint64(),
}
crypto.RandBytes(r.salt[:])

for i := 0; i < numBuckets; i++ {
r.buckets[i] = appRateLimiterBucket{entries: make(map[keyType]*appRateLimiterEntry), lru: util.NewList[keyType]()}
}
return r
}

func (r *appRateLimiter) entry(b *appRateLimiterBucket, key keyType, curInt int64) (*appRateLimiterEntry, bool) {
b.mu.Lock()
defer b.mu.Unlock()

if len(b.entries) >= r.maxBucketSize {
// evict the oldest entry
start := time.Now()

el := b.lru.Back()
delete(b.entries, el.Value)
b.lru.Remove(el)

atomic.AddUint64(&r.evictions, 1)
atomic.AddUint64(&r.evictionTime, uint64(time.Since(start)))
}

entry, ok := b.entries[key]
if ok {
el := entry.lruElement
// note, the entry is marked as recently used even before the rate limiting decision
// since it does not make sense to evict keys that are actively attempted
b.lru.MoveToFront(el)

// the same logic is applicable to the intervals: if a new interval is started, update the entry
// by moving the current value to the previous and resetting the current.
// this is done under a lock so that the interval is not updated concurrently.
// The rationale is even this requests is going to be dropped the new interval already started
// and it is OK to start a new interval and have it prepared for upcoming requests
var newPrev int64 = 0
switch entry.interval {
case curInt:
// the interval is the same, do nothing
case curInt - 1:
// these are continuous intervals, use current value as a new previous
newPrev = entry.cur.Load()
fallthrough
default:
// non-contiguous intervals, reset the entry
entry.prev.Store(newPrev)
entry.cur.Store(0)
entry.interval = curInt
}
} else {
el := b.lru.PushFront(key)
entry = &appRateLimiterEntry{interval: curInt, lruElement: el}
b.entries[key] = entry
}
return entry, ok
}

// interval calculates the interval numeric representation based on the given time
func (r *appRateLimiter) interval(nowNano int64) int64 {
return nowNano / int64(r.serviceRateWindow)
}

// fraction calculates the fraction of the interval that is elapsed since the given time
func (r *appRateLimiter) fraction(nowNano int64) float64 {
return float64(nowNano%int64(r.serviceRateWindow)) / float64(r.serviceRateWindow)
}

// shouldDrop returns true if the given transaction group should be dropped based on the
// on the rate for the applications in the group: the entire group is dropped if a single application
// exceeds the rate.
func (r *appRateLimiter) shouldDrop(txgroup []transactions.SignedTxn, origin []byte) bool {
return r.shouldDropAt(txgroup, origin, time.Now().UnixNano())
}

// shouldDropAt is the same as shouldDrop but accepts the current time as a parameter
// in order to make it testable
func (r *appRateLimiter) shouldDropAt(txgroup []transactions.SignedTxn, origin []byte, nowNano int64) bool {
keysBuckets := txgroupToKeys(txgroup, origin, r.seed, r.salt, numBuckets)
defer putAppKeyBuf(keysBuckets)
if len(keysBuckets.keys) == 0 {
return false
}
return r.shouldDropKeys(keysBuckets.buckets, keysBuckets.keys, nowNano)
}

func (r *appRateLimiter) shouldDropKeys(buckets []int, keys []keyType, nowNano int64) bool {
curInt := r.interval(nowNano)
curFraction := r.fraction(nowNano)

for i, key := range keys {
// TODO: reuse last entry for matched keys and buckets?
b := buckets[i]
entry, has := r.entry(&r.buckets[b], key, curInt)
if !has {
// new entry, defaults are provided by entry() function
// admit and increment
entry.cur.Add(1)
continue
}

rate := int64(float64(entry.prev.Load())*(1-curFraction)) + entry.cur.Load() + 1
if rate > int64(r.serviceRatePerWindow) {
return true
}
entry.cur.Add(1)
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
}

return false
}

func (r *appRateLimiter) len() int {
var count int
for i := 0; i < numBuckets; i++ {
r.buckets[i].mu.RLock()
count += len(r.buckets[i].entries)
r.buckets[i].mu.RUnlock()
}
return count
}

var appKeyPool = sync.Pool{
New: func() interface{} {
return &appKeyBuf{
// max config.MaxTxGroupSize apps per txgroup, each app has up to MaxAppTxnForeignApps extra foreign apps
// at moment of writing config.MaxTxGroupSize = 16, config.MaxAppTxnForeignApps = 8
keys: make([]keyType, 0, config.MaxTxGroupSize*(1+config.MaxAppTxnForeignApps)),
buckets: make([]int, 0, config.MaxTxGroupSize*(1+config.MaxAppTxnForeignApps)),
}
},
}

// appKeyBuf is a reusable storage for key and bucket slices
type appKeyBuf struct {
keys []keyType
buckets []int
}

func getAppKeyBuf() *appKeyBuf {
buf := appKeyPool.Get().(*appKeyBuf)
buf.buckets = buf.buckets[:0]
buf.keys = buf.keys[:0]
return buf
}

func putAppKeyBuf(buf *appKeyBuf) {
appKeyPool.Put(buf)
}

// txgroupToKeys converts txgroup data to keys
func txgroupToKeys(txgroup []transactions.SignedTxn, origin []byte, seed uint64, salt [16]byte, numBuckets int) *appKeyBuf {
keysBuckets := getAppKeyBuf()
// since blake2 is a crypto hash function it seems OK to shrink 32 bytes digest down to 8.
// Rationale: we expect thousands of apps sent from thousands of peers,
// so required millions of unique pairs => 8 bytes should be enough.
// The 16 bytes salt makes it harder to find collisions if an adversary attempts to censor
// some app by finding a collision with some app and flood a network with such transactions:
// h(app + relay_ip) = h(app2 + relay_ip).
var buf [8 + 16 + 16]byte // uint64 + 16 bytes of salt + up to 16 bytes of address
txnToDigest := func(appIdx basics.AppIndex) keyType {
binary.LittleEndian.PutUint64(buf[:8], uint64(appIdx))
copy(buf[8:], salt[:])
copied := copy(buf[8+16:], origin)

h := blake2b.Sum256(buf[:8+16+copied])
var key keyType
copy(key[:], h[:len(key)])
return key
}
txnToBucket := func(appIdx basics.AppIndex) int {
return int(memhash64(uint64(appIdx), seed) % uint64(numBuckets))
}
for i := range txgroup {
if txgroup[i].Txn.Type == protocol.ApplicationCallTx {
appIdx := txgroup[i].Txn.ApplicationID
jasonpaulos marked this conversation as resolved.
Show resolved Hide resolved
// hash appIdx into a bucket, do not use modulo without hashing first since it could
// assign two vanilla (and presumable, popular) apps to the same bucket.
keysBuckets.buckets = append(keysBuckets.buckets, txnToBucket(appIdx))
keysBuckets.keys = append(keysBuckets.keys, txnToDigest(appIdx))
if len(txgroup[i].Txn.ForeignApps) > 0 {
for _, appIdx := range txgroup[i].Txn.ForeignApps {
keysBuckets.buckets = append(keysBuckets.buckets, txnToBucket(appIdx))
keysBuckets.keys = append(keysBuckets.keys, txnToDigest(appIdx))
jasonpaulos marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
return keysBuckets
}

const (
// Constants for multiplication: four random odd 64-bit numbers.
m1 = 16877499708836156737
m2 = 2820277070424839065
m3 = 9497967016996688599
m4 = 15839092249703872147
)

// memhash64 is uint64 hash function from go runtime
// https://go-review.googlesource.com/c/go/+/59352/4/src/runtime/hash64.go#96
func memhash64(val uint64, seed uint64) uint64 {
h := seed
h ^= val
h = rotl31(h*m1) * m2
h ^= h >> 29
h *= m3
h ^= h >> 32
return h
}

func rotl31(x uint64) uint64 {
return (x << 31) | (x >> (64 - 31))
}