Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, orderwatch, meshdb: Implement a dynamically decreasing max expiration time for orders #450

Merged
merged 40 commits into from Oct 24, 2019
Merged
Changes from 32 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3c2e9ea
Add expiration time index
albrow Oct 15, 2019
82d8c06
Implement and test TrimOrdersByExpirationTime
albrow Oct 15, 2019
b411e10
Integrate TrimOrdersByExpirationTime in OrderWatcher
albrow Oct 16, 2019
b957707
Add an env var for max orders in storage
albrow Oct 16, 2019
1dc6401
Reject orders with an expiration time greater than the current max
albrow Oct 16, 2019
7fc2f88
Change default max orders to 100k
albrow Oct 17, 2019
a09bc23
Update constants/constants.go
albrow Oct 17, 2019
065cb1f
Update zeroex/orderwatch/order_watcher.go
albrow Oct 17, 2019
10acf12
Remove in-memory state for orders removed due to expiration time
albrow Oct 17, 2019
6837d34
Clarify that UnlimitedExpirationTime = 2^256-1
albrow Oct 17, 2019
88874a0
Implement and test SlowCounter
albrow Oct 18, 2019
b1962dc
Use SlowCounter inside OrderWatcher to increase max expiration time
albrow Oct 18, 2019
ca40d1e
Add todo about loading max expiration time from db
albrow Oct 18, 2019
5a873e8
Use big.Float instead of big.Rat in SlowCounter
albrow Oct 18, 2019
dc8fa51
Add a configurable starting offset to SlowCounter
albrow Oct 18, 2019
75bd78c
Add test for SlowCounter.Reset
albrow Oct 18, 2019
5d0cdbb
Save MaxExpirationTime in database
albrow Oct 18, 2019
c218452
Check if we can increase max expiration time after an order is perman…
albrow Oct 18, 2019
ef33b0b
Handle some additional edge cases where the database could be empty
albrow Oct 19, 2019
45a2812
Simplify SlowCounter implementation and remove concept of "ticks"
albrow Oct 21, 2019
acc74b7
Ensure max expiration time is not in the past
albrow Oct 21, 2019
b8a767f
Add MaxExpirationTime to GetStats
albrow Oct 21, 2019
9229616
Start a separate loop inside OrderWatcher for increasing max exp time
albrow Oct 22, 2019
af7d71d
Add maxOrdersInStorage config option to TypeScript bindings
albrow Oct 22, 2019
a609ccb
Add note about default values to TypeScript bindings
albrow Oct 22, 2019
0969ef7
Add TODO in OrderWatcher about extending transaction
albrow Oct 22, 2019
607df23
Rename a function in OrderWatcher
albrow Oct 22, 2019
a9be34d
Add isMax boolean cache to SlowCounter
albrow Oct 22, 2019
fbd5531
Add test in order_watcher_test for max expiration time
albrow Oct 22, 2019
8d920b2
Rename REMOVED order event to STOPPED_WATCHING
albrow Oct 22, 2019
b2d7567
Add new maxExpirationTime field to docs/rpc_api.md
albrow Oct 22, 2019
a5bfe82
Update CHANGELOG
albrow Oct 22, 2019
ddf6777
Update zeroex/orderwatch/order_watcher.go
albrow Oct 24, 2019
7e63d30
Update zeroex/orderwatch/order_watcher.go
albrow Oct 24, 2019
34fb304
Update zeroex/orderwatch/slowcounter/slow_counter.go
albrow Oct 24, 2019
38e0b87
Return an error for missing config fields in orderwatch.New
albrow Oct 24, 2019
3957058
Merge branch 'feature/dynamic-max-expiration' of github.com:0xProject…
albrow Oct 24, 2019
30c9abb
Add hack in Watcher.Add for orders that suddenly expire too late
albrow Oct 24, 2019
e346f4d
Update zeroex/orderwatch/order_watcher.go
albrow Oct 24, 2019
437373b
Add STOPPED_WATCHING event and missing default for TypeScript bindings
albrow Oct 24, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -2,6 +2,12 @@

This changelog is a work in progress and may contain notes for versions which have not actually been released. Check the [Releases](https://github.com/0xProject/0x-mesh/releases) page to see full release notes and more information about the latest released versions.

## v5.2.0-beta

### Features ✅

- Implemented a new strategy for limiting the amount of database storage used by Mesh and removing orders when the database is full. This strategy involves a dynamically adjusting maximum expiration time. When the database is full, Mesh will enforce a maximum expiration time for all incoming orders and remove any existing orders with an expiration time too far in the future. If conditions change and there is enough space in the database again, the max expiration time will slowly increase. This is a short term solution which solves the immediate issue of finite storage capacities and does a decent job of protecting against spam. We expect to improve and possibly replace it in the future. See [#450](https://github.com/0xProject/0x-mesh/pull/450) for more details.

## v5.1.0-beta

### Features ✅
@@ -123,6 +123,9 @@ func convertConfig(jsConfig js.Value) (core.Config, error) {
if customContractAddresses := jsConfig.Get("customContractAddresses"); !isNullOrUndefined(customContractAddresses) {
config.CustomContractAddresses = customContractAddresses.String()
}
if maxOrdersInStorage := jsConfig.Get("maxOrdersInStorage"); !isNullOrUndefined(maxOrdersInStorage) {
config.MaxOrdersInStorage = maxOrdersInStorage.Int()
}

return config, nil
}
@@ -66,14 +66,13 @@ export interface Config {
// Parity, feel free to double the default max in order to reduce the number
// of RPC calls made by Mesh. Defaults to 524288 bytes.
ethereumRPCMaxContentLength?: number;
// customContractAddresses is set of custom addresses to use for the
// configured network ID. The contract addresses for most common networks
// are already included by default, so this is typically only needed for
// testing on custom networks. The given addresses are added to the default
// list of addresses for known networks and overriding any contract
// addresses for known networks is not allowed. The addresses for exchange,
// devUtils, erc20Proxy, and erc721Proxy are required for each network. For
// example:
// A set of custom addresses to use for the configured network ID. The
// contract addresses for most common networks are already included by
// default, so this is typically only needed for testing on custom networks.
// The given addresses are added to the default list of addresses for known
// networks and overriding any contract addresses for known networks is not
// allowed. The addresses for exchange, devUtils, erc20Proxy, and
// erc721Proxy are required for each network. For example:
//
// {
// exchange: "0x48bacb9266a570d521063ef5dd96e61686dbe788",
@@ -83,6 +82,11 @@ export interface Config {
// }
//
customContractAddresses?: ContractAddresses;
// The maximum number of orders that Mesh will keep in storage. As the
// number of orders in storage grows, Mesh will begin enforcing a limit on
// maximum expiration time for incoming orders and remove any orders with an
// expiration time too far in the future. Defaults to 100,000.
maxOrdersInStorage?: number;
}

export interface ContractAddresses {
@@ -133,6 +137,7 @@ interface WrapperConfig {
blockPollingIntervalSeconds?: number;
ethereumRPCMaxContentLength?: number;
customContractAddresses?: string; // json-encoded instead of Object.
maxOrdersInStorage?: number;
}

// The type for signed orders exposed by MeshWrapper. Unlike other types, the
@@ -2,6 +2,7 @@ package constants

import (
"errors"
"math/big"

"github.com/ethereum/go-ethereum/common"

@@ -60,3 +61,12 @@ var ErrInternal = errors.New("internal error")

// TestMaxContentLength is the max Ethereum RPC Content-Length used in tests
var TestMaxContentLength = 1024 * 512

// UnlimitedExpirationTime is the maximum value for uint256 (2^256-1), which
// means there is effectively no limit on the maximum expiration time for
// orders.
var UnlimitedExpirationTime *big.Int

func init() {
UnlimitedExpirationTime, _ = big.NewInt(0).SetString("115792089237316195423570985008687907853269984665640564039457584007913129639935", 10)
}
@@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/ethereum"
"github.com/0xProject/0x-mesh/ethereum/blockwatch"
@@ -114,6 +115,11 @@ type Config struct {
// }
//
CustomContractAddresses string `envvar:"CUSTOM_CONTRACT_ADDRESSES" default:""`
// MaxOrdersInStorage is the maximum number of orders that Mesh will keep in
// storage. As the number of orders in storage grows, Mesh will begin
// enforcing a limit on maximum expiration time for incoming orders and remove
// any orders with an expiration time too far in the future.
MaxOrdersInStorage int `envvar:"MAX_ORDERS_IN_STORAGE" default:"100000"`
}

type snapshotInfo struct {
@@ -178,8 +184,9 @@ func New(config Config) (*App, error) {
return nil, err
}

// Check if the DB has been previously intialized with a different networkId
if err = initNetworkID(config.EthereumNetworkID, meshDB); err != nil {
// Initialize metadata and check stored network id (if any).
metadata, err := initMetadata(config.EthereumNetworkID, meshDB)
if err != nil {
return nil, err
}

@@ -213,7 +220,15 @@ func New(config Config) (*App, error) {
}

// Initialize order watcher (but don't start it yet).
orderWatcher, err := orderwatch.New(meshDB, blockWatcher, orderValidator, config.EthereumNetworkID, config.OrderExpirationBuffer)
orderWatcher, err := orderwatch.New(orderwatch.Config{
MeshDB: meshDB,
BlockWatcher: blockWatcher,
OrderValidator: orderValidator,
NetworkID: config.EthereumNetworkID,
ExpirationBuffer: config.OrderExpirationBuffer,
MaxOrders: config.MaxOrdersInStorage,
MaxExpirationTime: metadata.MaxExpirationTime,
})
if err != nil {
return nil, err
}
@@ -297,27 +312,30 @@ func initPrivateKey(path string) (p2pcrypto.PrivKey, error) {
return nil, err
}

func initNetworkID(networkID int, meshDB *meshdb.MeshDB) error {
func initMetadata(networkID int, meshDB *meshdb.MeshDB) (*meshdb.Metadata, error) {
metadata, err := meshDB.GetMetadata()
if err != nil {
if _, ok := err.(db.NotFoundError); ok {
// No stored metadata found (first startup)
metadata = &meshdb.Metadata{EthereumNetworkID: networkID}
metadata = &meshdb.Metadata{
EthereumNetworkID: networkID,
MaxExpirationTime: constants.UnlimitedExpirationTime,
}
if err := meshDB.SaveMetadata(metadata); err != nil {
return err
return nil, err
}
return nil
return metadata, nil
}
return err
return nil, err
}

// on subsequent startups, verify we are on the same network
if metadata.EthereumNetworkID != networkID {
err := fmt.Errorf("expected networkID to be %d but got %d", metadata.EthereumNetworkID, networkID)
log.WithError(err).Error("Mesh previously started on different Ethereum network; switch networks or remove DB")
return err
return nil, err
}
return nil
return metadata, nil
}

func (app *App) Start(ctx context.Context) error {
@@ -730,6 +748,7 @@ func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
NumOrders: numOrders,
NumPeers: app.node.GetNumPeers(),
NumOrdersIncludingRemoved: numOrdersIncludingRemoved,
MaxExpirationTime: app.orderWatcher.MaxExpirationTime().String(),
}
return response, nil
}
@@ -758,6 +777,7 @@ func (app *App) periodicallyLogStats(ctx context.Context) {
"numOrders": stats.NumOrders,
"numOrdersIncludingRemoved": stats.NumOrdersIncludingRemoved,
"numPeers": stats.NumPeers,
"maxExpirationTime": stats.MaxExpirationTime,
}).Info("current stats")
}
}
@@ -17,14 +17,14 @@ func TestEthereumNetworkDetection(t *testing.T) {
defer meshDB.Close()

// simulate starting up on mainnet
err = initNetworkID(1, meshDB)
_, err = initMetadata(1, meshDB)
require.NoError(t, err)

// simulate restart on same network
err = initNetworkID(1, meshDB)
_, err = initMetadata(1, meshDB)
require.NoError(t, err)

// should error when attempting to start on different network
err = initNetworkID(2, meshDB)
_, err = initMetadata(2, meshDB)
assert.Error(t, err)
}
@@ -115,6 +115,15 @@ func (app *App) validateOrders(orders []*zeroex.SignedOrder) (*ordervalidator.Va
})
continue
}
if order.ExpirationTimeSeconds.Cmp(app.orderWatcher.MaxExpirationTime()) == 1 {
results.Rejected = append(results.Rejected, &ordervalidator.RejectedOrderInfo{
OrderHash: orderHash,
SignedOrder: order,
Kind: ordervalidator.MeshValidation,
Status: ordervalidator.ROMaxExpirationExceeded,
})
continue
}
// Note(albrow): Orders with a sender address can be canceled or invalidated
// off-chain which is difficult to support since we need to prune
// canceled/invalidated orders from the database. We can special-case some
@@ -204,7 +204,8 @@ Gets certain configurations and stats about a Mesh node.
},
"numPeers": 18,
"numOrders": 1095,
"numOrdersIncludingRemoved": 1134
"numOrdersIncludingRemoved": 1134,
"maxExpirationTime": "717784680"
},
"id": 1
}
@@ -5,6 +5,7 @@ import (
"math/big"
"time"

"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/ethereum/miniheader"
"github.com/0xProject/0x-mesh/zeroex"
@@ -35,6 +36,7 @@ func (o Order) ID() []byte {
// Metadata is the database representation of MeshDB instance metadata
type Metadata struct {
EthereumNetworkID int
MaxExpirationTime *big.Int
}

// ID returns the id used for the metadata collection (one per DB)
@@ -63,6 +65,7 @@ type OrdersCollection struct {
MakerAddressTokenAddressTokenIDIndex *db.Index
LastUpdatedIndex *db.Index
IsRemovedIndex *db.Index
ExpirationTimeIndex *db.Index
}

// MetadataCollection represents a DB collection used to store instance metadata
@@ -115,7 +118,7 @@ func setupOrders(database *db.DB) (*OrdersCollection, error) {
// unsigned 256 bit integer is 80, so we pad with zeroes such that the
// length of the number is always 80.
signedOrder := m.(*Order).SignedOrder
index := []byte(fmt.Sprintf("%s|%080s", signedOrder.MakerAddress.Hex(), signedOrder.Salt.String()))
index := []byte(fmt.Sprintf("%s|%s", signedOrder.MakerAddress.Hex(), uint256ToConstantLengthBytes(signedOrder.Salt)))
return index
})
// TODO(fabio): Optimize this index callback since it gets called many times under-the-hood.
@@ -150,12 +153,18 @@ func setupOrders(database *db.DB) (*OrdersCollection, error) {
return []byte{0}
})

expirationTimeIndex := col.AddIndex("expirationTime", func(m db.Model) []byte {
order := m.(*Order)
return uint256ToConstantLengthBytes(order.SignedOrder.ExpirationTimeSeconds)
})

return &OrdersCollection{
Collection: col,
MakerAddressTokenAddressTokenIDIndex: makerAddressTokenAddressTokenIDIndex,
MakerAddressAndSaltIndex: makerAddressAndSaltIndex,
LastUpdatedIndex: lastUpdatedIndex,
IsRemovedIndex: isRemovedIndex,
ExpirationTimeIndex: expirationTimeIndex,
}, nil
}

@@ -170,7 +179,7 @@ func setupMiniHeaders(database *db.DB) (*MiniHeadersCollection, error) {
// unsigned 256 bit integer is 80, so we pad with zeroes such that the
// length of the number is always 80.
number := model.(*miniheader.MiniHeader).Number
return []byte(fmt.Sprintf("%080s", number.String()))
return uint256ToConstantLengthBytes(number)
})

return &MiniHeadersCollection{
@@ -250,7 +259,7 @@ func (m *MeshDB) FindOrdersByMakerAddressAndMaxSalt(makerAddress common.Address,
// particular use-case, we add 1 to the supplied salt (making the query inclusive instead)
saltPlusOne := new(big.Int).Add(salt, big.NewInt(1))
start := []byte(fmt.Sprintf("%s|%080s", makerAddress.Hex(), "0"))
limit := []byte(fmt.Sprintf("%s|%080s", makerAddress.Hex(), saltPlusOne.String()))
limit := []byte(fmt.Sprintf("%s|%s", makerAddress.Hex(), uint256ToConstantLengthBytes(saltPlusOne)))
filter := m.Orders.MakerAddressAndSaltIndex.RangeFilter(start, limit)
orders := []*Order{}
if err := m.Orders.NewQuery(filter).Run(&orders); err != nil {
@@ -281,14 +290,36 @@ func (m *MeshDB) GetMetadata() (*Metadata, error) {
return &metadata, nil
}

// SaveMetadata inserts the metadata into the database.
// SaveMetadata inserts the metadata into the database, overwriting any existing
// metadata.
func (m *MeshDB) SaveMetadata(metadata *Metadata) error {
if err := m.metadata.Insert(metadata); err != nil {
return err
}
return nil
}

// UpdateMetadata updates the metadata in the database via a transaction. It
// accepts a callback function which will be provided with the old metadata and
// should return the new metadata to save.
func (m *MeshDB) UpdateMetadata(updater func(oldmetadata Metadata) (newMetadata Metadata)) error {
txn := m.metadata.OpenTransaction()
defer func() {
_ = txn.Discard()
}()

oldMetadata, err := m.GetMetadata()
if err != nil {
return err
}
newMetadata := updater(*oldMetadata)
if err := txn.Update(&newMetadata); err != nil {
return err
}

return txn.Commit()
}

type singleAssetData struct {
Address common.Address
TokenID *big.Int
@@ -355,3 +386,55 @@ func parseContractAddressesAndTokenIdsFromAssetData(assetData []byte) ([]singleA
}
return singleAssetDatas, nil
}

func uint256ToConstantLengthBytes(v *big.Int) []byte {
return []byte(fmt.Sprintf("%080s", v.String()))
}

// TrimOrdersByExpirationTime removes existing orders with the highest
// expiration time until the number of remaining orders is <= targetMaxOrders.
// It returns any orders that were removed and the new max expiration time that
// can be used to eliminate incoming orders that expire too far in the future.
func (m *MeshDB) TrimOrdersByExpirationTime(targetMaxOrders int) (newMaxExpirationTime *big.Int, removedOrders []*Order, err error) {
txn := m.Orders.OpenTransaction()
defer func() {
_ = txn.Discard()
}()

numOrders, err := m.Orders.Count()
if err != nil {
return nil, nil, err
}
if numOrders <= targetMaxOrders {
// If the number of orders is less than the target, we don't need to remove
// any orders. Return UnlimitedExpirationTime.
return constants.UnlimitedExpirationTime, nil, nil
}

// Find the orders which we need to remove.
filter := m.Orders.ExpirationTimeIndex.All()
numOrdersToRemove := numOrders - targetMaxOrders
if err := m.Orders.NewQuery(filter).Reverse().Max(numOrdersToRemove).Run(&removedOrders); err != nil {
return nil, nil, err
}

// Remove those orders and commit the transaction.
for _, order := range removedOrders {
if err := txn.Delete(order.Hash.Bytes()); err != nil {
return nil, nil, err
}
}
if err := txn.Commit(); err != nil {
return nil, nil, err
}

// The new max expiration time is simply the minimum expiration time of the
// orders that were removed (i.e., the expiration time of the last order in
// the slice). We add a buffer of -1 just to make sure we don't exceed
// targetMaxOrders. This means it is technically possible that there are a
// number of orders currently in the database that exceed the max expiration
// time, but no new orders that exceed this time will be added.
newMaxExpirationTime = removedOrders[len(removedOrders)-1].SignedOrder.ExpirationTimeSeconds
newMaxExpirationTime = newMaxExpirationTime.Sub(newMaxExpirationTime, big.NewInt(1))
return newMaxExpirationTime, removedOrders, nil
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.