Skip to content

Commit

Permalink
Add remote IP addr to app idx filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Sep 11, 2023
1 parent 2cff84e commit 37de57d
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 32 deletions.
47 changes: 29 additions & 18 deletions data/appRateLimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package data

import (
"encoding/binary"
"sync/atomic"
"time"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/protocol"
"golang.org/x/crypto/blake2b"

"github.com/algorand/go-deadlock"
)
Expand All @@ -36,15 +38,15 @@ type appRateLimiter struct {
// TODO: consider some kind of concurrent map
// TODO: add expiration strategy
mu deadlock.RWMutex
apps map[basics.AppIndex]*appRateLimiterEntry
apps map[crypto.Digest]*appRateLimiterEntry
}

func MakeAppRateLimiter(maxSize uint64, serviceRate uint64, serviceRateWindow time.Duration) *appRateLimiter {

Check failure on line 44 in data/appRateLimiter.go

View workflow job for this annotation

GitHub Actions / reviewdog-errors

[Lint Errors] reported by reviewdog 🐶 exported: exported function MakeAppRateLimiter should have comment or be unexported (revive) Raw Output: data/appRateLimiter.go:44:1: exported: exported function MakeAppRateLimiter should have comment or be unexported (revive) func MakeAppRateLimiter(maxSize uint64, serviceRate uint64, serviceRateWindow time.Duration) *appRateLimiter { ^
return &appRateLimiter{
maxSize: maxSize,
serviceRate: serviceRate,
serviceRateWindow: serviceRateWindow,
apps: map[basics.AppIndex]*appRateLimiterEntry{},
apps: map[crypto.Digest]*appRateLimiterEntry{},
}
}

Expand All @@ -54,14 +56,14 @@ type appRateLimiterEntry struct {
interval int64 // numeric representation of the current interval value
}

func (r *appRateLimiter) entry(appIdx basics.AppIndex) (*appRateLimiterEntry, bool) {
func (r *appRateLimiter) entry(key crypto.Digest) (*appRateLimiterEntry, bool) {
r.mu.Lock()
defer r.mu.Unlock()

entry, ok := r.apps[appIdx]
entry, ok := r.apps[key]
if !ok {
entry = &appRateLimiterEntry{}
r.apps[appIdx] = entry
r.apps[key] = entry
}
return entry, ok
}
Expand All @@ -79,29 +81,38 @@ func (r *appRateLimiter) fraction(now time.Time) float64 {
// shouldDrop returns true if the given transaction group should be dropped based on the
// on the rate for the applications in the group: the entire group is dropped if a single application
// exceeds the rate.
func (r *appRateLimiter) shouldDrop(txgroup []transactions.SignedTxn) bool {
return r.shouldDropInner(txgroup, time.Now())
func (r *appRateLimiter) shouldDrop(txgroup []transactions.SignedTxn, origin []byte) bool {
return r.shouldDropInner(txgroup, origin, time.Now())
}

func (r *appRateLimiter) shouldDropInner(txgroup []transactions.SignedTxn, now time.Time) bool {
var apps []basics.AppIndex
func (r *appRateLimiter) shouldDropInner(txgroup []transactions.SignedTxn, origin []byte, now time.Time) bool {
var keys []crypto.Digest
// TODO: check memory allocs
var buf [8 + 4]byte // uint64 + [4]byte
for i := range txgroup {
if txgroup[i].Txn.Type == protocol.ApplicationCallTx {
apps = append(apps, txgroup[i].Txn.ApplicationID)
// TODO: enable? this could allow apps censoring
// if len(txgroup[i].Txn.ForeignApps) > 0 {
// apps = append(apps, txgroup[i].Txn.ForeignApps...)
// }
binary.LittleEndian.PutUint64(buf[:8], uint64(txgroup[i].Txn.ApplicationID))
toBeHashed := append(buf[8:], origin...)
d := crypto.Digest(blake2b.Sum256(toBeHashed))
keys = append(keys, d)
if len(txgroup[i].Txn.ForeignApps) > 0 {
for j := range txgroup[i].Txn.ForeignApps {
binary.LittleEndian.PutUint64(buf[:8], uint64(txgroup[i].Txn.ForeignApps[j]))
toBeHashed := append(buf[8:], origin...)
d := crypto.Digest(blake2b.Sum256(toBeHashed))
keys = append(keys, d)
}
}
}
}
if len(apps) == 0 {
if len(keys) == 0 {
return false
}

curInt := r.interval(now)

for _, appIdx := range apps {
entry, has := r.entry(appIdx)
for _, key := range keys {
entry, has := r.entry(key)
if !has {
// new entry, fill defaults and continue
entry.interval = curInt
Expand Down
26 changes: 13 additions & 13 deletions data/appRateLimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestAppRateLimiter_NoApps(t *testing.T) {
},
},
}
drop := rm.shouldDrop(txns)
drop := rm.shouldDrop(txns, nil)
require.False(t, drop)
}

Expand All @@ -72,15 +72,15 @@ func TestAppRateLimiter_Basics(t *testing.T) {

txns := getAppTxnGroup(1)
now := time.Now()
drop := rm.shouldDropInner(txns, now)
drop := rm.shouldDropInner(txns, nil, now)
require.False(t, drop)

for i := len(txns); i < int(rate); i++ {
drop = rm.shouldDropInner(txns, now)
drop = rm.shouldDropInner(txns, nil, now)
require.False(t, drop)
}

drop = rm.shouldDropInner(txns, now)
drop = rm.shouldDropInner(txns, nil, now)
require.True(t, drop)

// check a single group with exceed rate is dropped
Expand All @@ -92,10 +92,10 @@ func TestAppRateLimiter_Basics(t *testing.T) {
Txn: apptxn2,
})
}
drop = rm.shouldDropInner(txns, now)
drop = rm.shouldDropInner(txns, nil, now)
require.True(t, drop)

drop = rm.shouldDropInner(txns, now.Add(2*window))
drop = rm.shouldDropInner(txns, nil, now.Add(2*window))
require.True(t, drop)
}

Expand All @@ -116,17 +116,17 @@ func TestAppRateLimiter_Interval(t *testing.T) {
// 0.9 is calculated as 1 - 0.1 (fraction of the interval elapsed)
// since the next interval at second 21 would by 1 sec (== 10% == 0.1) after the interval beginning
for i := 0; i < int(0.8*float64(rate)); i++ {
drop := rm.shouldDropInner(txns, now)
drop := rm.shouldDropInner(txns, nil, now)
require.False(t, drop)
}

next := now.Add(window)
for i := 0; i < int(0.3*float64(rate)); i++ {
drop := rm.shouldDropInner(txns, next)
drop := rm.shouldDropInner(txns, nil, next)
require.False(t, drop)
}

drop := rm.shouldDropInner(txns, next)
drop := rm.shouldDropInner(txns, nil, next)
require.True(t, drop)
}

Expand All @@ -146,17 +146,17 @@ func TestAppRateLimiter_IntervalSkip(t *testing.T) {
// ensure all capacity is available

for i := 0; i < int(0.8*float64(rate)); i++ {
drop := rm.shouldDropInner(txns, now)
drop := rm.shouldDropInner(txns, nil, now)
require.False(t, drop)
}

nextnext := now.Add(2 * window)
for i := 0; i < int(rate); i++ {
drop := rm.shouldDropInner(txns, nextnext)
drop := rm.shouldDropInner(txns, nil, nextnext)
require.False(t, drop)
}

drop := rm.shouldDropInner(txns, nextnext)
drop := rm.shouldDropInner(txns, nil, nextnext)
require.True(t, drop)
}

Expand All @@ -172,7 +172,7 @@ func TestAppRateLimiter_MaxSize(t *testing.T) {
rm := MakeAppRateLimiter(size, rate, window)

for i := 1; i <= int(size)+1; i++ {
drop := rm.shouldDrop(getAppTxnGroup(basics.AppIndex(i)))
drop := rm.shouldDrop(getAppTxnGroup(basics.AppIndex(i)), nil)
require.False(t, drop)
}

Expand Down
2 changes: 1 addition & 1 deletion data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
}

// rate limit per application in a group. Limiting any app in a group drops the entire message.
if handler.appLimiter != nil && handler.appLimiter.shouldDrop(unverifiedTxGroup) {
if handler.appLimiter != nil && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.RemotePeer).GetIPv4Bytes()) {
return network.OutgoingMessage{Action: network.Ignore}
}

Expand Down
29 changes: 29 additions & 0 deletions network/p2pPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"net"
"time"

"github.com/algorand/go-algorand/logging"
"github.com/algorand/websocket"

"github.com/libp2p/go-libp2p/core/network"
yamux "github.com/libp2p/go-yamux/v4"
multiaddr "github.com/multiformats/go-multiaddr/net"
)

type wsPeerConnP2PImpl struct {
Expand Down Expand Up @@ -82,3 +84,30 @@ func (c *wsPeerConnP2PImpl) CloseWithoutFlush() error {
}

func (c *wsPeerConnP2PImpl) UnderlyingConn() net.Conn { return nil }

func (c *wsPeerConnP2PImpl) RemoteAddr() net.Addr {
netaddr, err := multiaddr.ToNetAddr(c.stream.Conn().RemoteMultiaddr())
if err != nil {
logging.Base().Errorf("Error converting multiaddr to netaddr: %v", err)
}
return netaddr
}

func (c *wsPeerConnP2PImpl) GetIPv4Bytes() []byte {
netaddr, err := multiaddr.ToNetAddr(c.stream.Conn().RemoteMultiaddr())
if err != nil {
logging.Base().Errorf("Error converting multiaddr to netaddr: %v", err)
return nil
}
return netaddr.(*net.TCPAddr).IP.To4()
}

func (c *wsPeerConnP2PImpl) GetPort() uint16 {
netaddr, err := multiaddr.ToNetAddr(c.stream.Conn().RemoteMultiaddr())
if err != nil {
logging.Base().Errorf("Error converting multiaddr to netaddr: %v", err)
return 0
}

return uint16(netaddr.(*net.TCPAddr).Port)
}
15 changes: 15 additions & 0 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ var defaultSendMessageTags = map[protocol.Tag]bool{

// interface allows substituting debug implementation for *websocket.Conn
type wsPeerWebsocketConn interface {
RemoteAddr() net.Addr
RemoteAddrString() string
NextReader() (int, io.Reader, error)
WriteMessage(int, []byte) error
Expand Down Expand Up @@ -324,6 +325,12 @@ type HTTPPeer interface {
GetHTTPClient() *http.Client
}

// RemotePeer is used for fetching sender's IP for rate limiting and logging.
type RemotePeer interface {
GetIPv4Bytes() []byte
GetPort() uint16
}

// UnicastPeer is another possible interface for the opaque Peer.
// It is possible that we can only initiate a connection to a peer over websockets.
type UnicastPeer interface {
Expand Down Expand Up @@ -372,6 +379,14 @@ func (wp *wsPeer) Version() string {
return wp.version
}

func (wp *wsPeer) GetIPv4Bytes() []byte {
return wp.conn.RemoteAddr().(*net.TCPAddr).IP.To4()
}

func (wp *wsPeer) GetPort() uint16 {
return uint16(wp.conn.RemoteAddr().(*net.TCPAddr).Port)
}

// Unicast sends the given bytes to this specific peer. Does not wait for message to be sent.
// (Implements UnicastPeer)
func (wp *wsPeer) Unicast(ctx context.Context, msg []byte, tag protocol.Tag) error {
Expand Down

0 comments on commit 37de57d

Please sign in to comment.