Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow pool to be blocked #1899

Merged
merged 9 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,9 @@ func startProfilingHttpServer(c metrics.Config) {
mux.HandleFunc(metrics.ProfilingSymbolEndpoint, pprof.Symbol)
mux.HandleFunc(metrics.ProfilingTraceEndpoint, pprof.Trace)
profilingServer := &http.Server{
Handler: mux,
ReadTimeout: two * time.Minute,
Handler: mux,
ReadHeaderTimeout: two * time.Minute,
ReadTimeout: two * time.Minute,
}
log.Infof("profiling server listening on port %d", c.ProfilingPort)
if err := profilingServer.Serve(lis); err != nil {
Expand All @@ -426,8 +427,9 @@ func startMetricsHttpServer(c metrics.Config) {
mux.Handle(metrics.Endpoint, promhttp.Handler())

metricsServer := &http.Server{
Handler: mux,
ReadTimeout: ten * time.Second,
Handler: mux,
ReadHeaderTimeout: ten * time.Second,
ReadTimeout: ten * time.Second,
}
log.Infof("metrics server listening on port %d", c.Port)
if err := metricsServer.Serve(lis); err != nil {
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ MaxConns = 200

[Pool]
FreeClaimGasLimit = 150000
IntervalToRefreshBlockedAddresses = "5m"
MaxTxBytesSize=30132
MaxTxDataBytesSize=30000
DefaultMinGasPriceAllowed = 1000000000
Expand Down
1 change: 1 addition & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ MaxConns = 200

[Pool]
FreeClaimGasLimit = 1500000
IntervalToRefreshBlockedAddresses = "5m"
MaxTxBytesSize=30132
MaxTxDataBytesSize=30000
DefaultMinGasPriceAllowed = 1000000000
Expand Down
1 change: 1 addition & 0 deletions config/environments/public/public.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ MaxConns = 200

[Pool]
FreeClaimGasLimit = 1500000
IntervalToRefreshBlockedAddresses = "5m"
MaxTxBytesSize=30132
MaxTxDataBytesSize=30000
DefaultMinGasPriceAllowed = 1000000000
Expand Down
2 changes: 1 addition & 1 deletion db/migrations/pool/0006.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ ADD COLUMN deposit_count BIGINT;

-- +migrate Down
ALTER TABLE pool.transaction
DROP COLUMN deposit_count;
DROP COLUMN deposit_count;
8 changes: 8 additions & 0 deletions db/migrations/pool/0007.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- +migrate Up
CREATE TABLE pool.blocked
(
addr varchar NOT NULL PRIMARY KEY
);

-- +migrate Down
DROP TABLE pool.blocked;
14 changes: 8 additions & 6 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ func (s *Server) startHTTP() error {
mux.Handle("/", tollbooth.LimitFuncHandler(lmt, s.handle))

s.srv = &http.Server{
Handler: mux,
ReadTimeout: s.config.ReadTimeoutInSec * time.Second,
WriteTimeout: s.config.WriteTimeoutInSec * time.Second,
Handler: mux,
ReadHeaderTimeout: s.config.ReadTimeoutInSec * time.Second,
ReadTimeout: s.config.ReadTimeoutInSec * time.Second,
WriteTimeout: s.config.WriteTimeoutInSec * time.Second,
}
log.Infof("http server started: %s", address)
if err := s.srv.Serve(lis); err != nil {
Expand Down Expand Up @@ -160,9 +161,10 @@ func (s *Server) startWS() {
mux.HandleFunc("/", s.handleWs)

s.wsSrv = &http.Server{
Handler: mux,
ReadTimeout: s.config.ReadTimeoutInSec * time.Second,
WriteTimeout: s.config.WriteTimeoutInSec * time.Second,
Handler: mux,
ReadHeaderTimeout: s.config.ReadTimeoutInSec * time.Second,
ReadTimeout: s.config.ReadTimeoutInSec * time.Second,
WriteTimeout: s.config.WriteTimeoutInSec * time.Second,
}
s.wsUpgrader = websocket.Upgrader{
ReadBufferSize: wsBufferSizeLimitInBytes,
Expand Down
4 changes: 4 additions & 0 deletions pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type Config struct {
// FreeClaimGasLimit is the max gas allowed use to do a free claim
FreeClaimGasLimit uint64 `mapstructure:"FreeClaimGasLimit"`

// IntervalToRefreshBlockedAddresses is the time it takes to sync the
// blocked address list from db to memory
IntervalToRefreshBlockedAddresses types.Duration `mapstructure:"IntervalToRefreshBlockedAddresses"`

// MaxTxBytesSize is the max size of a transaction in bytes
MaxTxBytesSize uint64 `mapstructure:"MaxTxBytesSize"`

Expand Down
3 changes: 3 additions & 0 deletions pool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (
// ErrInvalidSender is returned if the transaction contains an invalid signature.
ErrInvalidSender = errors.New("invalid sender")

// ErrBlockedSender is returned if the transaction is sent by a blocked account.
ErrBlockedSender = errors.New("blocked sender")

// ErrNonceTooLow is returned if the nonce of a transaction is lower than the
// one present in the local chain.
ErrNonceTooLow = errors.New("nonce too low")
Expand Down
1 change: 1 addition & 0 deletions pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type storage interface {
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, error)
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
GetAllAddressesBlocked(ctx context.Context) ([]common.Address, error)
MinGasPriceSince(ctx context.Context, timestamp time.Time) (uint64, error)
DepositCountExists(ctx context.Context, depositCount uint64) (bool, error)
}
Expand Down
27 changes: 27 additions & 0 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,33 @@ func (p *PostgresPoolStorage) UpdateTxWIPStatus(ctx context.Context, hash common
return nil
}

// GetAllAddressesBlocked get all addresses blocked
func (p *PostgresPoolStorage) GetAllAddressesBlocked(ctx context.Context) ([]common.Address, error) {
sql := `SELECT addr FROM pool.blocked`

rows, err := p.db.Query(ctx, sql)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
} else {
return nil, err
}
}
defer rows.Close()

var addrs []common.Address
for rows.Next() {
var addr string
err := rows.Scan(&addr)
if err != nil {
return nil, err
}
addrs = append(addrs, common.HexToAddress(addr))
}

return addrs, nil
}

// DepositCountExists checks if already exists a transaction in the pool with the
// provided deposit count
func (p *PostgresPoolStorage) DepositCountExists(ctx context.Context, depositCount uint64) (bool, error) {
Expand Down
50 changes: 49 additions & 1 deletion pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Pool struct {
l2BridgeAddr common.Address
chainID uint64
cfg Config
blockedAddresses sync.Map
minSuggestedGasPrice *big.Int
minSuggestedGasPriceMux *sync.RWMutex
eventLog *event.EventLog
Expand All @@ -58,15 +59,56 @@ type preExecutionResponse struct {

// NewPool creates and initializes an instance of Pool
func NewPool(cfg Config, s storage, st stateInterface, l2BridgeAddr common.Address, chainID uint64, eventLog *event.EventLog) *Pool {
return &Pool{
p := &Pool{
cfg: cfg,
storage: s,
state: st,
l2BridgeAddr: l2BridgeAddr,
chainID: chainID,
blockedAddresses: sync.Map{},
minSuggestedGasPriceMux: new(sync.RWMutex),
eventLog: eventLog,
}

p.refreshBlockedAddresses()
go func(cfg *Config, p *Pool) {
for {
time.Sleep(cfg.IntervalToRefreshBlockedAddresses.Duration)
p.refreshBlockedAddresses()
}
}(&cfg, p)
return p
}

// refreshBlockedAddresses refreshes the list of blocked addresses for the provided instance of pool
func (p *Pool) refreshBlockedAddresses() {
blockedAddresses, err := p.storage.GetAllAddressesBlocked(context.Background())
if err != nil {
log.Error("failed to load blocked addresses")
return
}

blockedAddressesMap := sync.Map{}
for _, blockedAddress := range blockedAddresses {
blockedAddressesMap.Store(blockedAddress.String(), 1)
p.blockedAddresses.Store(blockedAddress.String(), 1)
}

unblockedAddresses := []string{}
p.blockedAddresses.Range(func(key, value any) bool {
addrHex := key.(string)
_, found := blockedAddressesMap.Load(addrHex)
if found {
return true
}

unblockedAddresses = append(unblockedAddresses, addrHex)
return true
})

for _, unblockedAddress := range unblockedAddresses {
p.blockedAddresses.Delete(unblockedAddress)
}
}

// StartPollingMinSuggestedGasPrice starts polling the minimum suggested gas price
Expand Down Expand Up @@ -300,6 +342,12 @@ func (p *Pool) validateTx(ctx context.Context, poolTx Transaction) error {
return ErrInvalidSender
}

// check if sender is blocked
_, blocked := p.blockedAddresses.Load(from.String())
if blocked {
return ErrBlockedSender
}

lastL2Block, err := p.state.GetLastL2Block(ctx, nil)
if err != nil {
return err
Expand Down
Loading