Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orderwatch: Avoid conflicting txn error when updating block headers in DB #614

Merged
merged 12 commits into from
Jan 10, 2020
Merged
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ This changelog is a work in progress and may contain notes for versions which ha
### Bug fixes 🐞

- Fixed a typo ("rendervouz" --> "rendezvous") in GetStatsResponse. ([#611](https://github.com/0x-mesh/pull/611)).
- Fixed a bug where we attempted to update the same order multiple times in a single DB txn, causing the later update to noop. ([#623](https://github.com/0xProject/0x-mesh/pull/623))
- Fixed a bug where we attempted to update the same order multiple times in a single DB txn, causing the later update to noop. ([#623](https://github.com/0xProject/0x-mesh/pull/623)).
- Fixed a bug which could cause Mesh to exit if a re-org condition occurs causing a block to be added and removed within the same block sync operation. ([#614](https://github.com/0xProject/0x-mesh/pull/614)).


## v8.0.0-beta-0xv3
Expand Down
22 changes: 16 additions & 6 deletions db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@ package db

import (
"errors"
"fmt"
"sync"
"sync/atomic"

"github.com/albrow/stringset"
)

var (
ErrDiscarded = errors.New("transaction has already been discarded")
ErrCommitted = errors.New("transaction has already been committed")
ErrConflictingOperations = errors.New("cannot perform more than one operation (insert/delete/update) on the same model within a transaction")
ErrDiscarded = errors.New("transaction has already been discarded")
ErrCommitted = errors.New("transaction has already been committed")
)

// ConflictingOperationsError is returned when two conflicting operations are attempted within the same
// transaction
type ConflictingOperationsError struct {
operation string
}

func (e ConflictingOperationsError) Error() string {
return fmt.Sprintf("error on %s: cannot perform more than one operation on the same model within a transaction", e.operation)
}

// Transaction is an atomic database transaction for a single collection which
// can be used to guarantee consistency.
type Transaction struct {
Expand Down Expand Up @@ -139,7 +149,7 @@ func (txn *Transaction) Insert(model Model) error {
return err
}
if txn.affectedIDs.Contains(string(model.ID())) {
return ErrConflictingOperations
return ConflictingOperationsError{operation: "insert"}
}
if err := insertWithTransaction(txn.colInfo, txn.readWriter, model); err != nil {
return err
Expand All @@ -159,7 +169,7 @@ func (txn *Transaction) Update(model Model) error {
return err
}
if txn.affectedIDs.Contains(string(model.ID())) {
return ErrConflictingOperations
return ConflictingOperationsError{operation: "update"}
}
if err := updateWithTransaction(txn.colInfo, txn.readWriter, model); err != nil {
return err
Expand All @@ -178,7 +188,7 @@ func (txn *Transaction) Delete(id []byte) error {
return err
}
if txn.affectedIDs.Contains(string(id)) {
return ErrConflictingOperations
return ConflictingOperationsError{operation: "delete"}
}
if err := deleteWithTransaction(txn.colInfo, txn.readWriter, id); err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions db/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestTransactionDeleteThenInsertSameModel(t *testing.T) {
require.NoError(t, txn.Delete(model.ID()))
err = txn.Insert(model)
assert.Error(t, err)
assert.Equal(t, ErrConflictingOperations, err, "wrong error")
assert.Equal(t, ConflictingOperationsError{operation: "insert"}, err, "wrong error")
}

func TestTransactionInsertThenDeleteSameModel(t *testing.T) {
Expand All @@ -309,7 +309,7 @@ func TestTransactionInsertThenDeleteSameModel(t *testing.T) {
require.NoError(t, txn.Insert(model))
err = txn.Delete(model.ID())
assert.Error(t, err)
assert.Equal(t, ErrConflictingOperations, err, "wrong error")
assert.Equal(t, ConflictingOperationsError{operation: "delete"}, err, "wrong error")
}

func TestTransactionInsertThenInsertSameModel(t *testing.T) {
Expand All @@ -334,7 +334,7 @@ func TestTransactionInsertThenInsertSameModel(t *testing.T) {
require.NoError(t, txn.Insert(model))
err = txn.Insert(model)
assert.Error(t, err)
assert.Equal(t, ErrConflictingOperations, err, "wrong error")
assert.Equal(t, ConflictingOperationsError{operation: "insert"}, err, "wrong error")
}

func TestTransactionDeleteThenDeleteSameModel(t *testing.T) {
Expand All @@ -360,7 +360,7 @@ func TestTransactionDeleteThenDeleteSameModel(t *testing.T) {
require.NoError(t, txn.Delete(model.ID()))
err = txn.Delete(model.ID())
assert.Error(t, err)
assert.Equal(t, ErrConflictingOperations, err, "wrong error")
assert.Equal(t, ConflictingOperationsError{operation: "delete"}, err, "wrong error")
}

func TestTransactionInsertThenUpdateSameModel(t *testing.T) {
Expand All @@ -385,5 +385,5 @@ func TestTransactionInsertThenUpdateSameModel(t *testing.T) {
require.NoError(t, txn.Insert(model))
err = txn.Update(model)
assert.Error(t, err)
assert.Equal(t, ErrConflictingOperations, err, "wrong error")
assert.Equal(t, ConflictingOperationsError{operation: "update"}, err, "wrong error")
}
104 changes: 79 additions & 25 deletions zeroex/orderwatch/order_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,24 +452,14 @@ func (w *Watcher) handleBlockEvents(
}
latestBlockNumber, latestBlockTimestamp := w.getBlockchainState(events)

err = updateBlockHeadersStoredInDB(miniHeadersColTxn, events)
if err != nil {
return err
}

orderHashToDBOrder := map[common.Hash]*meshdb.Order{}
orderHashToEvents := map[common.Hash][]*zeroex.ContractEvent{}
for _, event := range events {
blockHeader := event.BlockHeader
switch event.Type {
case blockwatch.Added:
err = miniHeadersColTxn.Insert(blockHeader)
if err != nil {
return err
}
case blockwatch.Removed:
err = miniHeadersColTxn.Delete(blockHeader.ID())
if err != nil {
return err
}
default:
return fmt.Errorf("Unrecognized block event type encountered: %d", event.Type)
}
for _, log := range event.BlockHeader.Logs {
eventType, err := w.eventDecoder.FindEventType(log)
if err != nil {
Expand Down Expand Up @@ -950,6 +940,13 @@ func (w *Watcher) add(orderInfo *ordervalidator.AcceptedOrderInfo, validationBlo
// return an error.
return orderEvents, nil
}
if _, ok := err.(db.ConflictingOperationsError); ok {
logger.WithFields(logger.Fields{
"error": err.Error(),
"order": order,
}).Error("Failed to insert order into DB")
return orderEvents, nil
}
return orderEvents, err
}
if err := txn.Commit(); err != nil {
Expand Down Expand Up @@ -1028,6 +1025,62 @@ func (w *Watcher) trimOrdersAndGenerateEvents() ([]*zeroex.OrderEvent, error) {
return orderEvents, nil
}

// updateBlockHeadersStoredInDB updates the block headers stored in the DB. Since our DB txns don't support
// multiple operations involving the same entry, we make sure we only perform either an insertion or a deletion
// for each block in this method.
func updateBlockHeadersStoredInDB(miniHeadersColTxn *db.Transaction, events []*blockwatch.Event) error {
blocksToAdd := map[common.Hash]*miniheader.MiniHeader{}
blocksToRemove := map[common.Hash]*miniheader.MiniHeader{}
for _, event := range events {
blockHeader := event.BlockHeader
switch event.Type {
case blockwatch.Added:
if _, ok := blocksToAdd[blockHeader.Hash]; ok {
continue
}
if _, ok := blocksToRemove[blockHeader.Hash]; ok {
delete(blocksToRemove, blockHeader.Hash)
}
blocksToAdd[blockHeader.Hash] = blockHeader
case blockwatch.Removed:
if _, ok := blocksToAdd[blockHeader.Hash]; ok {
delete(blocksToAdd, blockHeader.Hash)
}
if _, ok := blocksToRemove[blockHeader.Hash]; ok {
continue
}
blocksToRemove[blockHeader.Hash] = blockHeader
default:
return fmt.Errorf("Unrecognized block event type encountered: %d", event.Type)
}
}

for _, blockHeader := range blocksToAdd {
if err := miniHeadersColTxn.Insert(blockHeader); err != nil {
if _, ok := err.(db.AlreadyExistsError); !ok {
logger.WithFields(logger.Fields{
"error": err.Error(),
"hash": blockHeader.Hash,
"number": blockHeader.Number,
}).Error("Failed to insert miniHeaders")
}
}
}
for _, blockHeader := range blocksToRemove {
if err := miniHeadersColTxn.Delete(blockHeader.ID()); err != nil {
if _, ok := err.(db.NotFoundError); !ok {
logger.WithFields(logger.Fields{
"error": err.Error(),
"hash": blockHeader.Hash,
"number": blockHeader.Number,
}).Error("Failed to delete miniHeaders")
}
}
}

return nil
}

// MaxExpirationTime returns the current maximum expiration time for incoming
// orders.
func (w *Watcher) MaxExpirationTime() *big.Int {
Expand Down Expand Up @@ -1514,16 +1567,17 @@ type orderDeleter interface {
func (w *Watcher) permanentlyDeleteOrder(deleter orderDeleter, order *meshdb.Order) error {
err := deleter.Delete(order.Hash.Bytes())
if err != nil {
logger.WithFields(logger.Fields{
"error": err.Error(),
"order": order,
}).Warn("Attempted to delete order that no longer exists")
// TODO(fabio): With the current way the OrderWatcher is written, it is possible for multiple
// events to trigger logic that updates the orders in the DB simultaneously. This is mostly
// benign but is a waste of computation, and causes processes to try and delete orders the
// have already been deleted. In order to fix this, we need to re-write the event handling logic
// to queue the processing of events so that they happen sequentially rather then in parallel.
return nil // Already deleted. Noop.
if _, ok := err.(db.ConflictingOperationsError); ok {
logger.WithFields(logger.Fields{
"error": err.Error(),
"order": order,
}).Error("Failed to permanently delete order")
return nil
}
if _, ok := err.(db.NotFoundError); ok {
return nil // Already deleted. Noop.
}
return err
}

// After permanently deleting an order, we also remove it's assetData from the Decoder
Expand Down
Loading