From 627c117a7a55cc22221b740e17c5ba2b8d58ba35 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Tue, 3 Sep 2019 16:41:19 +0200 Subject: [PATCH 1/8] Update the order's lastUpdated timestamp in the DB whenever we re-validate it, not only if it's added/removed from the OrderWatcher --- zeroex/orderwatch/order_watcher.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index cd8be82ac..d6c250399 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -612,9 +612,10 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo } orderEvents = append(orderEvents, orderEvent) } else if oldFillableAmount.Cmp(newFillableAmount) == 0 { - // No important state-change happened, ignore - // Noop + // No important state-change happened, simply update lastUpdated timestamp + w.actualizeLastUpdated(order) } else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && oldAmountIsMoreThenNewAmount { + w.actualizeLastUpdated(order) // Order was filled, emit event orderEvent := &zeroex.OrderEvent{ OrderHash: acceptedOrderInfo.OrderHash, @@ -625,6 +626,7 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo } orderEvents = append(orderEvents, orderEvent) } else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && !oldAmountIsMoreThenNewAmount { + w.actualizeLastUpdated(order) // The order is now fillable for more then it was before. E.g.: // 1. A fill txn reverted (block-reorg) orderEvent := &zeroex.OrderEvent{ @@ -646,8 +648,9 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo order := orderWithTxHashes.Order oldFillableAmount := order.FillableTakerAssetAmount if oldFillableAmount.Cmp(big.NewInt(0)) == 0 { - // If the oldFillableAmount was already 0, this order is already flagged for removal - // Noop + // If the oldFillableAmount was already 0, this order is already flagged for removal. We simply + // update it's lastUpdated timestamp + w.actualizeLastUpdated(order) } else { // If oldFillableAmount > 0, it got fullyFilled, cancelled, expired or unfunded, emit event w.unwatchOrder(order, big.NewInt(0)) @@ -684,6 +687,17 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo return nil } +func (w *Watcher) actualizeLastUpdated(order *meshdb.Order) { + order.LastUpdated = time.Now().UTC() + err := w.meshDB.Orders.Update(order) + if err != nil { + logger.WithFields(logger.Fields{ + "error": err.Error(), + "order": order, + }).Error("Failed to update order") + } +} + func (w *Watcher) rewatchOrder(order *meshdb.Order, orderInfo *zeroex.AcceptedOrderInfo) { order.IsRemoved = false order.LastUpdated = time.Now().UTC() From 96e5b52f87cc3e934eaabba83cf62d6619569096 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Tue, 3 Sep 2019 17:19:55 +0200 Subject: [PATCH 2/8] Update both the lastUpdated and fillableTakerAssetAmount in DB --- zeroex/orderwatch/order_watcher.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index d6c250399..5ed0cc81e 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -612,11 +612,11 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo } orderEvents = append(orderEvents, orderEvent) } else if oldFillableAmount.Cmp(newFillableAmount) == 0 { - // No important state-change happened, simply update lastUpdated timestamp - w.actualizeLastUpdated(order) + // No important state-change happened, simply update lastUpdated timestamp in DB + w.updateOrderLastUpdatedAndFillableAmount(order) } else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && oldAmountIsMoreThenNewAmount { - w.actualizeLastUpdated(order) - // Order was filled, emit event + // Order was filled, emit event and update order in DB + w.updateOrderLastUpdatedAndFillableAmount(order) orderEvent := &zeroex.OrderEvent{ OrderHash: acceptedOrderInfo.OrderHash, SignedOrder: order.SignedOrder, @@ -626,9 +626,9 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo } orderEvents = append(orderEvents, orderEvent) } else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && !oldAmountIsMoreThenNewAmount { - w.actualizeLastUpdated(order) - // The order is now fillable for more then it was before. E.g.: - // 1. A fill txn reverted (block-reorg) + // The order is now fillable for more then it was before. E.g.: A fill txn reverted (block-reorg) + // Update order in DB and emit event + w.updateOrderLastUpdatedAndFillableAmount(order) orderEvent := &zeroex.OrderEvent{ OrderHash: acceptedOrderInfo.OrderHash, SignedOrder: order.SignedOrder, @@ -648,9 +648,9 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo order := orderWithTxHashes.Order oldFillableAmount := order.FillableTakerAssetAmount if oldFillableAmount.Cmp(big.NewInt(0)) == 0 { - // If the oldFillableAmount was already 0, this order is already flagged for removal. We simply - // update it's lastUpdated timestamp - w.actualizeLastUpdated(order) + // If the oldFillableAmount was already 0, this order is already flagged for removal. + // Update it's lastUpdated timestamp in DB + w.updateOrderLastUpdatedAndFillableAmount(order) } else { // If oldFillableAmount > 0, it got fullyFilled, cancelled, expired or unfunded, emit event w.unwatchOrder(order, big.NewInt(0)) @@ -687,8 +687,9 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo return nil } -func (w *Watcher) actualizeLastUpdated(order *meshdb.Order) { +func (w *Watcher) updateOrderLastUpdatedAndFillableAmount(order *meshdb.Order) { order.LastUpdated = time.Now().UTC() + order.FillableTakerAssetAmount = order.FillableTakerAssetAmount err := w.meshDB.Orders.Update(order) if err != nil { logger.WithFields(logger.Fields{ From 4c044154246483ec0dc4574a62c3c84591199806 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Tue, 3 Sep 2019 17:26:25 +0200 Subject: [PATCH 3/8] Update method name for clarity --- zeroex/orderwatch/order_watcher.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 5ed0cc81e..a07e8de94 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -613,10 +613,10 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo orderEvents = append(orderEvents, orderEvent) } else if oldFillableAmount.Cmp(newFillableAmount) == 0 { // No important state-change happened, simply update lastUpdated timestamp in DB - w.updateOrderLastUpdatedAndFillableAmount(order) + w.updateOrderDBEntry(order) } else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && oldAmountIsMoreThenNewAmount { // Order was filled, emit event and update order in DB - w.updateOrderLastUpdatedAndFillableAmount(order) + w.updateOrderDBEntry(order) orderEvent := &zeroex.OrderEvent{ OrderHash: acceptedOrderInfo.OrderHash, SignedOrder: order.SignedOrder, @@ -628,7 +628,7 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo } else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && !oldAmountIsMoreThenNewAmount { // The order is now fillable for more then it was before. E.g.: A fill txn reverted (block-reorg) // Update order in DB and emit event - w.updateOrderLastUpdatedAndFillableAmount(order) + w.updateOrderDBEntry(order) orderEvent := &zeroex.OrderEvent{ OrderHash: acceptedOrderInfo.OrderHash, SignedOrder: order.SignedOrder, @@ -650,7 +650,7 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo if oldFillableAmount.Cmp(big.NewInt(0)) == 0 { // If the oldFillableAmount was already 0, this order is already flagged for removal. // Update it's lastUpdated timestamp in DB - w.updateOrderLastUpdatedAndFillableAmount(order) + w.updateOrderDBEntry(order) } else { // If oldFillableAmount > 0, it got fullyFilled, cancelled, expired or unfunded, emit event w.unwatchOrder(order, big.NewInt(0)) @@ -687,9 +687,8 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo return nil } -func (w *Watcher) updateOrderLastUpdatedAndFillableAmount(order *meshdb.Order) { +func (w *Watcher) updateOrderDBEntry(order *meshdb.Order) { order.LastUpdated = time.Now().UTC() - order.FillableTakerAssetAmount = order.FillableTakerAssetAmount err := w.meshDB.Orders.Update(order) if err != nil { logger.WithFields(logger.Fields{ From 4ceff74ed62e67c3cd602d21f4292e2cf8c57a80 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Thu, 5 Sep 2019 10:02:16 +0200 Subject: [PATCH 4/8] Rename method for clarity since it now only finds the orders --- zeroex/orderwatch/order_watcher.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index a07e8de94..d46de098d 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -272,7 +272,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { } return err } - orders, err = w.findOrdersAndGenerateOrderEvents(transferEvent.From, log.Address, nil) + orders, err = w.findOrders(transferEvent.From, log.Address, nil) if err != nil { return err } @@ -290,7 +290,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { if approvalEvent.Spender != w.contractAddresses.ERC20Proxy { continue } - orders, err = w.findOrdersAndGenerateOrderEvents(approvalEvent.Owner, log.Address, nil) + orders, err = w.findOrders(approvalEvent.Owner, log.Address, nil) if err != nil { return err } @@ -304,7 +304,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { } return err } - orders, err = w.findOrdersAndGenerateOrderEvents(transferEvent.From, log.Address, transferEvent.TokenId) + orders, err = w.findOrders(transferEvent.From, log.Address, transferEvent.TokenId) if err != nil { return err } @@ -322,7 +322,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { if approvalEvent.Approved != w.contractAddresses.ERC721Proxy { continue } - orders, err = w.findOrdersAndGenerateOrderEvents(approvalEvent.Owner, log.Address, approvalEvent.TokenId) + orders, err = w.findOrders(approvalEvent.Owner, log.Address, approvalEvent.TokenId) if err != nil { return err } @@ -340,7 +340,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { if approvalForAllEvent.Operator != w.contractAddresses.ERC721Proxy { continue } - orders, err = w.findOrdersAndGenerateOrderEvents(approvalForAllEvent.Owner, log.Address, nil) + orders, err = w.findOrders(approvalForAllEvent.Owner, log.Address, nil) if err != nil { return err } @@ -354,7 +354,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { } return err } - orders, err = w.findOrdersAndGenerateOrderEvents(withdrawalEvent.Owner, log.Address, nil) + orders, err = w.findOrders(withdrawalEvent.Owner, log.Address, nil) if err != nil { return err } @@ -368,7 +368,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { } return err } - orders, err = w.findOrdersAndGenerateOrderEvents(depositEvent.Owner, log.Address, nil) + orders, err = w.findOrders(depositEvent.Owner, log.Address, nil) if err != nil { return err } @@ -553,7 +553,7 @@ func (w *Watcher) findOrderAndGenerateOrderEvents(orderHash common.Hash) *meshdb return &order } -func (w *Watcher) findOrdersAndGenerateOrderEvents(makerAddress, tokenAddress common.Address, tokenID *big.Int) ([]*meshdb.Order, error) { +func (w *Watcher) findOrders(makerAddress, tokenAddress common.Address, tokenID *big.Int) ([]*meshdb.Order, error) { orders, err := w.meshDB.FindOrdersByMakerAddressTokenAddressAndTokenID(makerAddress, tokenAddress, tokenID) if err != nil { logger.WithFields(logger.Fields{ From 6755141d497b3ea40a465b59f0c771a182fa66c4 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Thu, 5 Sep 2019 10:29:09 +0200 Subject: [PATCH 5/8] Put OrderEvent generation within an Orders collection transaction so that no one else can modify the orders while the orders are being read and modified by this method --- zeroex/orderwatch/order_watcher.go | 37 +++++++++++++++++++----------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index d46de098d..053078d2c 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -226,7 +226,7 @@ func (w *Watcher) handleExpiration(expiredOrders []expirationwatch.ExpiredItem) FillableTakerAssetAmount: big.NewInt(0), OrderStatus: zeroex.OSExpired, } - w.unwatchOrder(order, order.FillableTakerAssetAmount) + w.unwatchOrder(w.meshDB.Orders, order, order.FillableTakerAssetAmount) orderEvent := &zeroex.OrderEvent{ OrderHash: orderInfo.OrderHash, @@ -565,6 +565,7 @@ func (w *Watcher) findOrders(makerAddress, tokenAddress common.Address, tokenID } func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[common.Hash]*OrderWithTxHashes) error { + ordersColTxn := w.meshDB.Orders.OpenTransaction() signedOrders := []*zeroex.SignedOrder{} for _, orderWithTxHashes := range hashToOrderWithTxHashes { order := orderWithTxHashes.Order @@ -602,7 +603,7 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo // A previous event caused this order to be removed from DB because it's // fillableAmount became 0, but it has now been revived (e.g., block re-org // causes order fill txn to get reverted). We need to re-add order and emit an event. - w.rewatchOrder(order, acceptedOrderInfo) + w.rewatchOrder(ordersColTxn, order, acceptedOrderInfo) orderEvent := &zeroex.OrderEvent{ OrderHash: acceptedOrderInfo.OrderHash, SignedOrder: order.SignedOrder, @@ -613,10 +614,10 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo orderEvents = append(orderEvents, orderEvent) } else if oldFillableAmount.Cmp(newFillableAmount) == 0 { // No important state-change happened, simply update lastUpdated timestamp in DB - w.updateOrderDBEntry(order) + w.updateOrderDBEntry(ordersColTxn, order) } else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && oldAmountIsMoreThenNewAmount { // Order was filled, emit event and update order in DB - w.updateOrderDBEntry(order) + w.updateOrderDBEntry(ordersColTxn, order) orderEvent := &zeroex.OrderEvent{ OrderHash: acceptedOrderInfo.OrderHash, SignedOrder: order.SignedOrder, @@ -628,7 +629,7 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo } else if oldFillableAmount.Cmp(big.NewInt(0)) == 1 && !oldAmountIsMoreThenNewAmount { // The order is now fillable for more then it was before. E.g.: A fill txn reverted (block-reorg) // Update order in DB and emit event - w.updateOrderDBEntry(order) + w.updateOrderDBEntry(ordersColTxn, order) orderEvent := &zeroex.OrderEvent{ OrderHash: acceptedOrderInfo.OrderHash, SignedOrder: order.SignedOrder, @@ -650,10 +651,10 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo if oldFillableAmount.Cmp(big.NewInt(0)) == 0 { // If the oldFillableAmount was already 0, this order is already flagged for removal. // Update it's lastUpdated timestamp in DB - w.updateOrderDBEntry(order) + w.updateOrderDBEntry(ordersColTxn, order) } else { // If oldFillableAmount > 0, it got fullyFilled, cancelled, expired or unfunded, emit event - w.unwatchOrder(order, big.NewInt(0)) + w.unwatchOrder(ordersColTxn, order, big.NewInt(0)) kind, ok := zeroex.ConvertRejectOrderCodeToOrderEventKind(rejectedOrderInfo.Status) if !ok { err := fmt.Errorf("no OrderEventKind corresponding to RejectedOrderStatus: %q", rejectedOrderInfo.Status) @@ -681,15 +682,25 @@ func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[commo return err } } + if err := ordersColTxn.Commit(); err != nil { + logger.WithFields(logger.Fields{ + "error": err.Error(), + }).Error("Failed to commit orders collection transaction") + } + if len(orderEvents) > 0 { w.orderFeed.Send(orderEvents) } return nil } -func (w *Watcher) updateOrderDBEntry(order *meshdb.Order) { +type updatableOrdersCol interface { + Update(model db.Model) error +} + +func (w *Watcher) updateOrderDBEntry(u updatableOrdersCol, order *meshdb.Order) { order.LastUpdated = time.Now().UTC() - err := w.meshDB.Orders.Update(order) + err := u.Update(order) if err != nil { logger.WithFields(logger.Fields{ "error": err.Error(), @@ -698,11 +709,11 @@ func (w *Watcher) updateOrderDBEntry(order *meshdb.Order) { } } -func (w *Watcher) rewatchOrder(order *meshdb.Order, orderInfo *zeroex.AcceptedOrderInfo) { +func (w *Watcher) rewatchOrder(u updatableOrdersCol, order *meshdb.Order, orderInfo *zeroex.AcceptedOrderInfo) { order.IsRemoved = false order.LastUpdated = time.Now().UTC() order.FillableTakerAssetAmount = orderInfo.FillableTakerAssetAmount - err := w.meshDB.Orders.Update(order) + err := u.Update(order) if err != nil { logger.WithFields(logger.Fields{ "error": err.Error(), @@ -715,11 +726,11 @@ func (w *Watcher) rewatchOrder(order *meshdb.Order, orderInfo *zeroex.AcceptedOr w.expirationWatcher.Add(expirationTimestamp, order.Hash.Hex()) } -func (w *Watcher) unwatchOrder(order *meshdb.Order, newFillableAmount *big.Int) { +func (w *Watcher) unwatchOrder(u updatableOrdersCol, order *meshdb.Order, newFillableAmount *big.Int) { order.IsRemoved = true order.LastUpdated = time.Now().UTC() order.FillableTakerAssetAmount = newFillableAmount - err := w.meshDB.Orders.Update(order) + err := u.Update(order) if err != nil { logger.WithFields(logger.Fields{ "error": err.Error(), From 36a086f9734deb38ad64691af2347e51d4ef100e Mon Sep 17 00:00:00 2001 From: fabioberger Date: Thu, 5 Sep 2019 10:34:57 +0200 Subject: [PATCH 6/8] Add CHANGELOG entries for the bug fixes --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd52af8a8..ff06afb16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ This changelog is a work in progress and may contain notes for versions which ha ### Bug fixes 🐞 - Fixed two related bugs: One where order expiration events would be emitted multiple times and another that meant subsequent fill/cancel events for orders deemed expired were not emitted. Fills/cancels for expired orders will continue to be emitted if they occur within ~4 mins (i.e. 20 blocks) of the expiration ([#385](https://github.com/0xProject/0x-mesh/pull/385)). +- Fixed a data race-condition in OrderWatcher that could have caused order collection updates to be overwritten in the DB. ([#386](https://github.com/0xProject/0x-mesh/pull/386)) +- Fixed a bug where `fillableTakerAssetAmount` and `lastUpdated` were not always being properly updated in the DB. ([#386](https://github.com/0xProject/0x-mesh/pull/386)) - Fixed some issues with key prefixes for certain types not being applied correctly to logs ([#375](https://github.com/0xProject/0x-mesh/pull/375)). - Fixed an issue where order hashes were not being correctly logged ([#368](https://github.com/0xProject/0x-mesh/pull/368)). - Mesh will now properly shut down if the database is unexpectedly closed ([#370](https://github.com/0xProject/0x-mesh/pull/370)). From 933f75e5c0d4c3fad6ddf93881679030449395af Mon Sep 17 00:00:00 2001 From: fabioberger Date: Thu, 5 Sep 2019 21:52:59 +0200 Subject: [PATCH 7/8] Rename method for clarity --- zeroex/orderwatch/order_watcher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index 053078d2c..b1b7e3f80 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -382,7 +382,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { } return err } - order := w.findOrderAndGenerateOrderEvents(exchangeFillEvent.OrderHash) + order := w.findOrder(exchangeFillEvent.OrderHash) if order != nil { orders = append(orders, order) } @@ -397,7 +397,7 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { return err } orders = []*meshdb.Order{} - order := w.findOrderAndGenerateOrderEvents(exchangeCancelEvent.OrderHash) + order := w.findOrder(exchangeCancelEvent.OrderHash) if order != nil { orders = append(orders, order) } @@ -536,7 +536,7 @@ type OrderWithTxHashes struct { TxHashes map[common.Hash]interface{} } -func (w *Watcher) findOrderAndGenerateOrderEvents(orderHash common.Hash) *meshdb.Order { +func (w *Watcher) findOrder(orderHash common.Hash) *meshdb.Order { order := meshdb.Order{} err := w.meshDB.Orders.FindByID(orderHash.Bytes(), &order) if err != nil { From 15b83b8292b2c9a39fa0e2e6366466d1d60dfc53 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Thu, 5 Sep 2019 22:12:30 +0200 Subject: [PATCH 8/8] Move the orders collection transaction opening higher up so that it also encompasses the order reads, not just the writes --- zeroex/orderwatch/order_watcher.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/zeroex/orderwatch/order_watcher.go b/zeroex/orderwatch/order_watcher.go index b1b7e3f80..04c3910fc 100644 --- a/zeroex/orderwatch/order_watcher.go +++ b/zeroex/orderwatch/order_watcher.go @@ -240,6 +240,10 @@ func (w *Watcher) handleExpiration(expiredOrders []expirationwatch.ExpiredItem) } func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { + ordersColTxn := w.meshDB.Orders.OpenTransaction() + defer func() { + _ = ordersColTxn.Discard() + }() hashToOrderWithTxHashes := map[common.Hash]*OrderWithTxHashes{} for _, event := range events { for _, log := range event.BlockHeader.Logs { @@ -440,10 +444,14 @@ func (w *Watcher) handleBlockEvents(events []*blockwatch.Event) error { } } } - return w.generateOrderEventsIfChanged(hashToOrderWithTxHashes) + return w.generateOrderEventsIfChanged(ordersColTxn, hashToOrderWithTxHashes) } func (w *Watcher) cleanup(ctx context.Context) error { + ordersColTxn := w.meshDB.Orders.OpenTransaction() + defer func() { + _ = ordersColTxn.Discard() + }() lastUpdatedCutOff := time.Now().Add(-lastUpdatedBuffer) orders, err := w.meshDB.FindOrdersLastUpdatedBefore(lastUpdatedCutOff) if err != nil { @@ -465,7 +473,7 @@ func (w *Watcher) cleanup(ctx context.Context) error { TxHashes: map[common.Hash]interface{}{}, } } - return w.generateOrderEventsIfChanged(hashToOrderWithTxHashes) + return w.generateOrderEventsIfChanged(ordersColTxn, hashToOrderWithTxHashes) } // Add adds a 0x order to the DB and watches it for changes in fillability. It @@ -564,8 +572,7 @@ func (w *Watcher) findOrders(makerAddress, tokenAddress common.Address, tokenID return orders, nil } -func (w *Watcher) generateOrderEventsIfChanged(hashToOrderWithTxHashes map[common.Hash]*OrderWithTxHashes) error { - ordersColTxn := w.meshDB.Orders.OpenTransaction() +func (w *Watcher) generateOrderEventsIfChanged(ordersColTxn *db.Transaction, hashToOrderWithTxHashes map[common.Hash]*OrderWithTxHashes) error { signedOrders := []*zeroex.SignedOrder{} for _, orderWithTxHashes := range hashToOrderWithTxHashes { order := orderWithTxHashes.Order