Skip to content

Commit

Permalink
allow pool to be blocked (#1899)
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos committed Apr 6, 2023
1 parent 0949fa2 commit 322bb5a
Show file tree
Hide file tree
Showing 15 changed files with 382 additions and 293 deletions.
10 changes: 6 additions & 4 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,9 @@ func startProfilingHttpServer(c metrics.Config) {
mux.HandleFunc(metrics.ProfilingSymbolEndpoint, pprof.Symbol)
mux.HandleFunc(metrics.ProfilingTraceEndpoint, pprof.Trace)
profilingServer := &http.Server{
Handler: mux,
ReadTimeout: two * time.Minute,
Handler: mux,
ReadHeaderTimeout: two * time.Minute,
ReadTimeout: two * time.Minute,
}
log.Infof("profiling server listening on port %d", c.ProfilingPort)
if err := profilingServer.Serve(lis); err != nil {
Expand All @@ -426,8 +427,9 @@ func startMetricsHttpServer(c metrics.Config) {
mux.Handle(metrics.Endpoint, promhttp.Handler())

metricsServer := &http.Server{
Handler: mux,
ReadTimeout: ten * time.Second,
Handler: mux,
ReadHeaderTimeout: ten * time.Second,
ReadTimeout: ten * time.Second,
}
log.Infof("metrics server listening on port %d", c.Port)
if err := metricsServer.Serve(lis); err != nil {
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ MaxConns = 200
[Pool]
FreeClaimGasLimit = 150000
IntervalToRefreshBlockedAddresses = "5m"
MaxTxBytesSize=30132
MaxTxDataBytesSize=30000
DefaultMinGasPriceAllowed = 1000000000
Expand Down
1 change: 1 addition & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ MaxConns = 200

[Pool]
FreeClaimGasLimit = 1500000
IntervalToRefreshBlockedAddresses = "5m"
MaxTxBytesSize=30132
MaxTxDataBytesSize=30000
DefaultMinGasPriceAllowed = 1000000000
Expand Down
1 change: 1 addition & 0 deletions config/environments/public/public.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ MaxConns = 200

[Pool]
FreeClaimGasLimit = 1500000
IntervalToRefreshBlockedAddresses = "5m"
MaxTxBytesSize=30132
MaxTxDataBytesSize=30000
DefaultMinGasPriceAllowed = 1000000000
Expand Down
2 changes: 1 addition & 1 deletion db/migrations/pool/0006.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ ADD COLUMN deposit_count BIGINT;

-- +migrate Down
ALTER TABLE pool.transaction
DROP COLUMN deposit_count;
DROP COLUMN deposit_count;
8 changes: 8 additions & 0 deletions db/migrations/pool/0007.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- +migrate Up
CREATE TABLE pool.blocked
(
addr varchar NOT NULL PRIMARY KEY
);

-- +migrate Down
DROP TABLE pool.blocked;
14 changes: 8 additions & 6 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ func (s *Server) startHTTP() error {
mux.Handle("/", tollbooth.LimitFuncHandler(lmt, s.handle))

s.srv = &http.Server{
Handler: mux,
ReadTimeout: s.config.ReadTimeoutInSec * time.Second,
WriteTimeout: s.config.WriteTimeoutInSec * time.Second,
Handler: mux,
ReadHeaderTimeout: s.config.ReadTimeoutInSec * time.Second,
ReadTimeout: s.config.ReadTimeoutInSec * time.Second,
WriteTimeout: s.config.WriteTimeoutInSec * time.Second,
}
log.Infof("http server started: %s", address)
if err := s.srv.Serve(lis); err != nil {
Expand Down Expand Up @@ -160,9 +161,10 @@ func (s *Server) startWS() {
mux.HandleFunc("/", s.handleWs)

s.wsSrv = &http.Server{
Handler: mux,
ReadTimeout: s.config.ReadTimeoutInSec * time.Second,
WriteTimeout: s.config.WriteTimeoutInSec * time.Second,
Handler: mux,
ReadHeaderTimeout: s.config.ReadTimeoutInSec * time.Second,
ReadTimeout: s.config.ReadTimeoutInSec * time.Second,
WriteTimeout: s.config.WriteTimeoutInSec * time.Second,
}
s.wsUpgrader = websocket.Upgrader{
ReadBufferSize: wsBufferSizeLimitInBytes,
Expand Down
4 changes: 4 additions & 0 deletions pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type Config struct {
// FreeClaimGasLimit is the max gas allowed use to do a free claim
FreeClaimGasLimit uint64 `mapstructure:"FreeClaimGasLimit"`

// IntervalToRefreshBlockedAddresses is the time it takes to sync the
// blocked address list from db to memory
IntervalToRefreshBlockedAddresses types.Duration `mapstructure:"IntervalToRefreshBlockedAddresses"`

// MaxTxBytesSize is the max size of a transaction in bytes
MaxTxBytesSize uint64 `mapstructure:"MaxTxBytesSize"`

Expand Down
3 changes: 3 additions & 0 deletions pool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (
// ErrInvalidSender is returned if the transaction contains an invalid signature.
ErrInvalidSender = errors.New("invalid sender")

// ErrBlockedSender is returned if the transaction is sent by a blocked account.
ErrBlockedSender = errors.New("blocked sender")

// ErrNonceTooLow is returned if the nonce of a transaction is lower than the
// one present in the local chain.
ErrNonceTooLow = errors.New("nonce too low")
Expand Down
1 change: 1 addition & 0 deletions pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type storage interface {
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, error)
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
GetAllAddressesBlocked(ctx context.Context) ([]common.Address, error)
MinGasPriceSince(ctx context.Context, timestamp time.Time) (uint64, error)
DepositCountExists(ctx context.Context, depositCount uint64) (bool, error)
}
Expand Down
27 changes: 27 additions & 0 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,33 @@ func (p *PostgresPoolStorage) UpdateTxWIPStatus(ctx context.Context, hash common
return nil
}

// GetAllAddressesBlocked get all addresses blocked
func (p *PostgresPoolStorage) GetAllAddressesBlocked(ctx context.Context) ([]common.Address, error) {
sql := `SELECT addr FROM pool.blocked`

rows, err := p.db.Query(ctx, sql)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
} else {
return nil, err
}
}
defer rows.Close()

var addrs []common.Address
for rows.Next() {
var addr string
err := rows.Scan(&addr)
if err != nil {
return nil, err
}
addrs = append(addrs, common.HexToAddress(addr))
}

return addrs, nil
}

// DepositCountExists checks if already exists a transaction in the pool with the
// provided deposit count
func (p *PostgresPoolStorage) DepositCountExists(ctx context.Context, depositCount uint64) (bool, error) {
Expand Down
50 changes: 49 additions & 1 deletion pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Pool struct {
l2BridgeAddr common.Address
chainID uint64
cfg Config
blockedAddresses sync.Map
minSuggestedGasPrice *big.Int
minSuggestedGasPriceMux *sync.RWMutex
eventLog *event.EventLog
Expand All @@ -58,15 +59,56 @@ type preExecutionResponse struct {

// NewPool creates and initializes an instance of Pool
func NewPool(cfg Config, s storage, st stateInterface, l2BridgeAddr common.Address, chainID uint64, eventLog *event.EventLog) *Pool {
return &Pool{
p := &Pool{
cfg: cfg,
storage: s,
state: st,
l2BridgeAddr: l2BridgeAddr,
chainID: chainID,
blockedAddresses: sync.Map{},
minSuggestedGasPriceMux: new(sync.RWMutex),
eventLog: eventLog,
}

p.refreshBlockedAddresses()
go func(cfg *Config, p *Pool) {
for {
time.Sleep(cfg.IntervalToRefreshBlockedAddresses.Duration)
p.refreshBlockedAddresses()
}
}(&cfg, p)
return p
}

// refreshBlockedAddresses refreshes the list of blocked addresses for the provided instance of pool
func (p *Pool) refreshBlockedAddresses() {
blockedAddresses, err := p.storage.GetAllAddressesBlocked(context.Background())
if err != nil {
log.Error("failed to load blocked addresses")
return
}

blockedAddressesMap := sync.Map{}
for _, blockedAddress := range blockedAddresses {
blockedAddressesMap.Store(blockedAddress.String(), 1)
p.blockedAddresses.Store(blockedAddress.String(), 1)
}

unblockedAddresses := []string{}
p.blockedAddresses.Range(func(key, value any) bool {
addrHex := key.(string)
_, found := blockedAddressesMap.Load(addrHex)
if found {
return true
}

unblockedAddresses = append(unblockedAddresses, addrHex)
return true
})

for _, unblockedAddress := range unblockedAddresses {
p.blockedAddresses.Delete(unblockedAddress)
}
}

// StartPollingMinSuggestedGasPrice starts polling the minimum suggested gas price
Expand Down Expand Up @@ -300,6 +342,12 @@ func (p *Pool) validateTx(ctx context.Context, poolTx Transaction) error {
return ErrInvalidSender
}

// check if sender is blocked
_, blocked := p.blockedAddresses.Load(from.String())
if blocked {
return ErrBlockedSender
}

lastL2Block, err := p.state.GetLastL2Block(ctx, nil)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 322bb5a

Please sign in to comment.