Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orderwatch: Update lastUpdated and fillableTakerAssetAmount for order in DB #386

Merged
merged 8 commits into from Sep 5, 2019
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -19,6 +19,8 @@ This changelog is a work in progress and may contain notes for versions which ha
### Bug fixes 🐞

- Fixed two related bugs: One where order expiration events would be emitted multiple times and another that meant subsequent fill/cancel events for orders deemed expired were not emitted. Fills/cancels for expired orders will continue to be emitted if they occur within ~4 mins (i.e. 20 blocks) of the expiration ([#385](https://github.com/0xProject/0x-mesh/pull/385)).
- Fixed a data race-condition in OrderWatcher that could have caused order collection updates to be overwritten in the DB. ([#386](https://github.com/0xProject/0x-mesh/pull/386))
- Fixed a bug where `fillableTakerAssetAmount` and `lastUpdated` were not always being properly updated in the DB. ([#386](https://github.com/0xProject/0x-mesh/pull/386))
- Fixed some issues with key prefixes for certain types not being applied correctly to logs ([#375](https://github.com/0xProject/0x-mesh/pull/375)).
- Fixed an issue where order hashes were not being correctly logged ([#368](https://github.com/0xProject/0x-mesh/pull/368)).
- Mesh will now properly shut down if the database is unexpectedly closed ([#370](https://github.com/0xProject/0x-mesh/pull/370)).
Expand Down
88 changes: 60 additions & 28 deletions zeroex/orderwatch/order_watcher.go
Expand Up @@ -226,7 +226,7 @@ func (w *Watcher) handleExpiration(expiredOrders []expirationwatch.ExpiredItem)
FillableTakerAssetAmount: big.NewInt(0),
OrderStatus: zeroex.OSExpired,
}
w.unwatchOrder(order, order.FillableTakerAssetAmount)
w.unwatchOrder(w.meshDB.Orders, order, order.FillableTakerAssetAmount)

orderEvent := &zeroex.OrderEvent{
OrderHash: orderInfo.OrderHash,
Expand All @@ -240,6 +240,10 @@ func (w *Watcher) handleExpiration(expiredOrders []expirationwatch.ExpiredItem)
}

func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
ordersColTxn := w.meshDB.Orders.OpenTransaction()
defer func() {
_ = ordersColTxn.Discard()
}()
hashToOrderWithTxHashes := map[common.Hash]*OrderWithTxHashes{}
for _, event := range events {
for _, log := range event.BlockHeader.Logs {
Expand Down Expand Up @@ -272,7 +276,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
}
return err
}
orders, err = w.findOrdersAndGenerateOrderEvents(transferEvent.From, log.Address, nil)
orders, err = w.findOrders(transferEvent.From, log.Address, nil)
if err != nil {
return err
}
Expand All @@ -290,7 +294,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
if approvalEvent.Spender != w.contractAddresses.ERC20Proxy {
continue
}
orders, err = w.findOrdersAndGenerateOrderEvents(approvalEvent.Owner, log.Address, nil)
orders, err = w.findOrders(approvalEvent.Owner, log.Address, nil)
if err != nil {
return err
}
Expand All @@ -304,7 +308,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
}
return err
}
orders, err = w.findOrdersAndGenerateOrderEvents(transferEvent.From, log.Address, transferEvent.TokenId)
orders, err = w.findOrders(transferEvent.From, log.Address, transferEvent.TokenId)
if err != nil {
return err
}
Expand All @@ -322,7 +326,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
if approvalEvent.Approved != w.contractAddresses.ERC721Proxy {
continue
}
orders, err = w.findOrdersAndGenerateOrderEvents(approvalEvent.Owner, log.Address, approvalEvent.TokenId)
orders, err = w.findOrders(approvalEvent.Owner, log.Address, approvalEvent.TokenId)
if err != nil {
return err
}
Expand All @@ -340,7 +344,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
if approvalForAllEvent.Operator != w.contractAddresses.ERC721Proxy {
continue
}
orders, err = w.findOrdersAndGenerateOrderEvents(approvalForAllEvent.Owner, log.Address, nil)
orders, err = w.findOrders(approvalForAllEvent.Owner, log.Address, nil)
if err != nil {
return err
}
Expand All @@ -354,7 +358,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
}
return err
}
orders, err = w.findOrdersAndGenerateOrderEvents(withdrawalEvent.Owner, log.Address, nil)
orders, err = w.findOrders(withdrawalEvent.Owner, log.Address, nil)
if err != nil {
return err
}
Expand All @@ -368,7 +372,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
}
return err
}
orders, err = w.findOrdersAndGenerateOrderEvents(depositEvent.Owner, log.Address, nil)
orders, err = w.findOrders(depositEvent.Owner, log.Address, nil)
if err != nil {
return err
}
Expand All @@ -382,7 +386,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
}
return err
}
order := w.findOrderAndGenerateOrderEvents(exchangeFillEvent.OrderHash)
order := w.findOrder(exchangeFillEvent.OrderHash)
if order != nil {
orders = append(orders, order)
}
Expand All @@ -397,7 +401,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
return err
}
orders = []*meshdb.Order{}
order := w.findOrderAndGenerateOrderEvents(exchangeCancelEvent.OrderHash)
order := w.findOrder(exchangeCancelEvent.OrderHash)
if order != nil {
orders = append(orders, order)
}
Expand Down Expand Up @@ -440,10 +444,14 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error {
}
}
}
return w.generateOrderEventsIfChanged(hashToOrderWithTxHashes)
return w.generateOrderEventsIfChanged(ordersColTxn, hashToOrderWithTxHashes)
}

func (w *Watcher) cleanup(ctx context.Context) error {
ordersColTxn := w.meshDB.Orders.OpenTransaction()
defer func() {
_ = ordersColTxn.Discard()
}()
lastUpdatedCutOff := time.Now().Add(-lastUpdatedBuffer)
orders, err := w.meshDB.FindOrdersLastUpdatedBefore(lastUpdatedCutOff)
if err != nil {
Expand All @@ -465,7 +473,7 @@ func (w *Watcher) cleanup(ctx context.Context) error {
TxHashes: map[common.Hash]interface{}{},
}
}
return w.generateOrderEventsIfChanged(hashToOrderWithTxHashes)
return w.generateOrderEventsIfChanged(ordersColTxn, hashToOrderWithTxHashes)
}

// Add adds a 0x order to the DB and watches it for changes in fillability. It
Expand Down Expand Up @@ -536,7 +544,7 @@ type OrderWithTxHashes struct {
TxHashes map[common.Hash]interface{}
}

func (w *Watcher) findOrderAndGenerateOrderEvents(orderHash common.Hash) *meshdb.Order {
func (w *Watcher) findOrder(orderHash common.Hash) *meshdb.Order {
order := meshdb.Order{}
err := w.meshDB.Orders.FindByID(orderHash.Bytes(), &order)
if err != nil {
Expand All @@ -553,7 +561,7 @@ func (w *Watcher) findOrderAndGenerateOrderEvents(orderHash common.Hash) *meshdb
return &order
}

func (w *Watcher) findOrdersAndGenerateOrderEvents(makerAddress, tokenAddress common.Address, tokenID *big.Int) ([]*meshdb.Order, error) {
func (w *Watcher) findOrders(makerAddress, tokenAddress common.Address, tokenID *big.Int) ([]*meshdb.Order, error) {
orders, err := w.meshDB.FindOrdersByMakerAddressTokenAddressAndTokenID(makerAddress, tokenAddress, tokenID)
if err != nil {
logger.WithFields(logger.Fields{
Expand All @@ -564,7 +572,7 @@ func (w *Watcher) findOrdersAndGenerateOrderEvents(makerAddress, tokenAddress co
return orders, nil
}

func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[common.Hash]*OrderWithTxHashes) error {
func (w *Watcher) generateOrderEventsIfChanged(ordersColTxn *db.Transaction, hashToOrderWithTxHashes map[common.Hash]*OrderWithTxHashes) error {
signedOrders := []*zeroex.SignedOrder{}
for _, orderWithTxHashes := range hashToOrderWithTxHashes {
order := orderWithTxHashes.Order
Expand Down Expand Up @@ -602,7 +610,7 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo
// A previous event caused this order to be removed from DB because it's
// fillableAmount became 0, but it has now been revived (e.g., block re-org
// causes order fill txn to get reverted). We need to re-add order and emit an event.
w.rewatchOrder(order, acceptedOrderInfo)
w.rewatchOrder(ordersColTxn, order, acceptedOrderInfo)
orderEvent := &zeroex.OrderEvent{
OrderHash: acceptedOrderInfo.OrderHash,
SignedOrder: order.SignedOrder,
Expand All @@ -612,10 +620,11 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo
}
orderEvents = append(orderEvents, orderEvent)
} else if oldFillableAmount.Cmp(newFillableAmount) == 0 {
// No important state-change happened, ignore
// Noop
// No important state-change happened, simply update lastUpdated timestamp in DB
w.updateOrderDBEntry(ordersColTxn, order)
} else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && oldAmountIsMoreThenNewAmount {
// Order was filled, emit event
// Order was filled, emit event and update order in DB
w.updateOrderDBEntry(ordersColTxn, order)
orderEvent := &zeroex.OrderEvent{
OrderHash: acceptedOrderInfo.OrderHash,
SignedOrder: order.SignedOrder,
Expand All @@ -625,8 +634,9 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo
}
orderEvents = append(orderEvents, orderEvent)
} else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && !oldAmountIsMoreThenNewAmount {
// The order is now fillable for more then it was before. E.g.:
// 1. A fill txn reverted (block-reorg)
// The order is now fillable for more then it was before. E.g.: A fill txn reverted (block-reorg)
// Update order in DB and emit event
w.updateOrderDBEntry(ordersColTxn, order)
orderEvent := &zeroex.OrderEvent{
OrderHash: acceptedOrderInfo.OrderHash,
SignedOrder: order.SignedOrder,
Expand All @@ -646,11 +656,12 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo
order := orderWithTxHashes.Order
oldFillableAmount := order.FillableTakerAssetAmount
if oldFillableAmount.Cmp(big.NewInt(0)) == 0 {
// If the oldFillableAmount was already 0, this order is already flagged for removal
// Noop
// If the oldFillableAmount was already 0, this order is already flagged for removal.
// Update it's lastUpdated timestamp in DB
w.updateOrderDBEntry(ordersColTxn, order)
} else {
// If oldFillableAmount > 0, it got fullyFilled, cancelled, expired or unfunded, emit event
w.unwatchOrder(order, big.NewInt(0))
w.unwatchOrder(ordersColTxn, order, big.NewInt(0))
kind, ok := zeroex.ConvertRejectOrderCodeToOrderEventKind(rejectedOrderInfo.Status)
if !ok {
err := fmt.Errorf("no OrderEventKind corresponding to RejectedOrderStatus: %q", rejectedOrderInfo.Status)
Expand Down Expand Up @@ -678,17 +689,38 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo
return err
}
}
if err := ordersColTxn.Commit(); err != nil {
logger.WithFields(logger.Fields{
"error": err.Error(),
}).Error("Failed to commit orders collection transaction")
}

if len(orderEvents) > 0 {
w.orderFeed.Send(orderEvents)
}
return nil
}

func (w *Watcher) rewatchOrder(order *meshdb.Order, orderInfo *zeroex.AcceptedOrderInfo) {
type updatableOrdersCol interface {
Update(model db.Model) error
}

func (w *Watcher) updateOrderDBEntry(u updatableOrdersCol, order *meshdb.Order) {
order.LastUpdated = time.Now().UTC()
err := u.Update(order)
if err != nil {
logger.WithFields(logger.Fields{
"error": err.Error(),
"order": order,
}).Error("Failed to update order")
}
}

func (w *Watcher) rewatchOrder(u updatableOrdersCol, order *meshdb.Order, orderInfo *zeroex.AcceptedOrderInfo) {
order.IsRemoved = false
order.LastUpdated = time.Now().UTC()
order.FillableTakerAssetAmount = orderInfo.FillableTakerAssetAmount
err := w.meshDB.Orders.Update(order)
err := u.Update(order)
if err != nil {
logger.WithFields(logger.Fields{
"error": err.Error(),
Expand All @@ -701,11 +733,11 @@ func (w *Watcher) rewatchOrder(order *meshdb.Order, orderInfo *zeroex.AcceptedOr
w.expirationWatcher.Add(expirationTimestamp, order.Hash.Hex())
}

func (w *Watcher) unwatchOrder(order *meshdb.Order, newFillableAmount *big.Int) {
func (w *Watcher) unwatchOrder(u updatableOrdersCol, order *meshdb.Order, newFillableAmount *big.Int) {
order.IsRemoved = true
order.LastUpdated = time.Now().UTC()
order.FillableTakerAssetAmount = newFillableAmount
err := w.meshDB.Orders.Update(order)
err := u.Update(order)
if err != nil {
logger.WithFields(logger.Fields{
"error": err.Error(),
Expand Down