Skip to content

Commit

Permalink
Market.PurgeBook only unlocks coins for purged orders
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed May 14, 2020
1 parent 2e3c2ba commit 19a9e77
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 33 deletions.
2 changes: 1 addition & 1 deletion dex/msgjson/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ const (
Notification // 3
)

// String satisfies the Stringer interface for tranlating the MessageType code
// String satisfies the Stringer interface for translating the MessageType code
// into a description, primarily for logging.
func (mt MessageType) String() string {
switch mt {
Expand Down
2 changes: 1 addition & 1 deletion server/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
log slog.Logger
)

// SvrCore is satisfied by core.Core.
// SvrCore is satisfied by server/dex.DEX.
type SvrCore interface {
ConfigMsg() json.RawMessage
MarketRunning(mktName string) (found, running bool)
Expand Down
2 changes: 1 addition & 1 deletion server/db/driver/pg/internal/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ const (
)
INSERT INTO %s -- archived orders table for market X
SELECT * FROM moved
RETURNING oid, account_id;`
RETURNING oid, sell, account_id;`

// CreateCancelOrdersTable creates a table specified via the %s printf
// specifier for cancel orders.
Expand Down
39 changes: 26 additions & 13 deletions server/db/driver/pg/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,11 @@ func makePseudoCancel(target order.OrderID, user account.AccountID, base, quote
}

// FlushBook revokes all booked orders for a market.
func (a *Archiver) FlushBook(base, quote uint32) (int, error) {
marketSchema, err := a.marketSchema(base, quote)
func (a *Archiver) FlushBook(base, quote uint32) (sellsRemoved, buysRemoved []order.OrderID, err error) {
var marketSchema string
marketSchema, err = a.marketSchema(base, quote)
if err != nil {
return 0, err
return
}

// Booked orders (active) are made revoked (archived).
Expand All @@ -220,39 +221,49 @@ func (a *Archiver) FlushBook(base, quote uint32) (int, error) {

timeStamp := time.Now().Truncate(time.Millisecond).UTC()

dbTx, err := a.db.Begin()
var dbTx *sql.Tx
dbTx, err = a.db.Begin()
if err != nil {
return 0, fmt.Errorf("failed to begin database transaction: %v", err)
err = fmt.Errorf("failed to begin database transaction: %v", err)
return
}

fail := func() {
sellsRemoved, buysRemoved = nil, nil
a.fatalBackendErr(err)
_ = dbTx.Rollback()
}

// Changed all booked orders to revoked.
stmt := fmt.Sprintf(internal.PurgeBook, srcTableName, orderStatusRevoked, dstTableName)
rows, err := dbTx.Query(stmt, orderStatusBooked)
var rows *sql.Rows
rows, err = dbTx.Query(stmt, orderStatusBooked)
if err != nil {
fail()
return 0, err
return
}

var cos []*order.CancelOrder
for rows.Next() {
var oid order.OrderID
var sell bool
var aid account.AccountID
if err = rows.Scan(&oid, &aid); err != nil {
if err = rows.Scan(&oid, &sell, &aid); err != nil {
rows.Close()
fail()
return 0, err
return
}
cos = append(cos, makePseudoCancel(oid, aid, base, quote, timeStamp))
if sell {
sellsRemoved = append(sellsRemoved, oid)
} else {
buysRemoved = append(buysRemoved, oid)
}
}

if err = rows.Err(); err != nil {
fail()
return 0, err
return
}

// Insert the pseudo-cancel orders.
Expand All @@ -263,16 +274,18 @@ func (a *Archiver) FlushBook(base, quote uint32) (int, error) {
co.ServerTime, co.Commit, co.TargetOrderID, orderStatusRevoked, 0, 0)
if err != nil {
fail()
return 0, fmt.Errorf("failed to store pseudo-cancel order: %v", err)
err = fmt.Errorf("failed to store pseudo-cancel order: %v", err)
return
}
}

if err = dbTx.Commit(); err != nil {
fail()
return 0, fmt.Errorf("failed to commit transaction: %v", err)
err = fmt.Errorf("failed to commit transaction: %v", err)
return
}

return len(cos), nil
return
}

// BookOrders retrieves all booked orders (with order status booked) for the
Expand Down
12 changes: 9 additions & 3 deletions server/db/driver/pg/orders_online_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,18 @@ func TestFlushBook(t *testing.T) {
t.Fatalf("StoreOrder failed: %v", err)
}

numFlushed, err := archie.FlushBook(lo.BaseAsset, lo.QuoteAsset)
sellsRemoved, buysRemoved, err := archie.FlushBook(lo.BaseAsset, lo.QuoteAsset)
if err != nil {
t.Fatalf("FlushBook failed: %v", err)
}
if numFlushed != 1 {
t.Fatalf("flushed %d book order, expected 1", numFlushed)
if len(sellsRemoved) != 0 {
t.Fatalf("flushed %d book sell orders, expected 0", len(sellsRemoved))
}
if len(buysRemoved) != 1 {
t.Fatalf("flushed %d book buy orders, expected 1", len(buysRemoved))
}
if buysRemoved[0] != lo.ID() {
t.Errorf("flushed sell order has ID %v, expected %v", buysRemoved[0], lo.ID())
}

// Check for new status of the order.
Expand Down
2 changes: 1 addition & 1 deletion server/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type OrderArchiver interface {
BookOrders(base, quote uint32) ([]*order.LimitOrder, error)

// FlushBook revokes all booked orders for a market.
FlushBook(base, quote uint32) (int, error)
FlushBook(base, quote uint32) (sellsRemoved, buysRemoved []order.OrderID, err error)

// ActiveOrderCoins retrieves a CoinID slice for each active order.
ActiveOrderCoins(base, quote uint32) (baseCoins, quoteCoins map[order.OrderID][]order.CoinID, err error)
Expand Down
29 changes: 19 additions & 10 deletions server/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,20 +478,29 @@ func (m *Market) PurgeBook() {
defer m.bookMtx.Unlock()

// Revoke all booked orders in the DB.
N, err := m.storage.FlushBook(m.marketInfo.Base, m.marketInfo.Quote)
sellsRemoved, buysRemoved, err := m.storage.FlushBook(m.marketInfo.Base, m.marketInfo.Quote)
if err != nil {
log.Errorf("Failed to flush book for market %s: %v", m.marketInfo.Name, err)
} else {
log.Infof("Flushed %d booked orders for market %s", N, m.marketInfo.Name)
log.Infof("Flushed %d sell orders and %d buy orders from market %q book",
len(sellsRemoved), len(buysRemoved), m.marketInfo.Name)
// Clear the in-memory order book to match the DB.
m.book.Clear()
// Unlock all coins. TODO: only unlock previously booked order coins, do
// not include coins that might belong to orders still in epoch status.
// This won't matter if the market is suspended, but it does does if
// PurgeBook is used while the market is still accepting new orders and
// processing epochs.
m.coinLockerBase.UnlockAll()
m.coinLockerQuote.UnlockAll()
// Unlock coins for removed orders.

// TODO: only unlock previously booked order coins, do not include coins
// that might belong to orders still in epoch status. This won't matter
// if the market is suspended, but it does if PurgeBook is used while
// the market is still accepting new orders and processing epochs.

// Unlock base asset coins locked by sell orders.
for i := range sellsRemoved {
m.coinLockerBase.UnlockOrderCoins(sellsRemoved[i])
}
// Unlock quote asset coins locked by buy orders.
for i := range buysRemoved {
m.coinLockerQuote.UnlockOrderCoins(buysRemoved[i])
}
}
}

Expand All @@ -513,7 +522,7 @@ func (m *Market) Run(ctx context.Context) {
// Drain the order router of incoming orders that made it in after the
// main loop broke and before flagging the market stopped. Do this in a
// goroutine because the market is flagged as stopped under runMtx lock
// in this defer and their is a risk of deadlock in SubmitOrderAsync
// in this defer and there is a risk of deadlock in SubmitOrderAsync
// that sends under runMtx lock as well.
wgFeeds.Add(1)
go func() {
Expand Down
12 changes: 9 additions & 3 deletions server/market/market_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ func (ta *TArchivist) BookOrders(base, quote uint32) ([]*order.LimitOrder, error
defer ta.mtx.Unlock()
return ta.bookedOrders, nil
}
func (ta *TArchivist) FlushBook(base, quote uint32) (int, error) {
func (ta *TArchivist) FlushBook(base, quote uint32) (sells, buys []order.OrderID, err error) {
ta.mtx.Lock()
defer ta.mtx.Unlock()
N := len(ta.bookedOrders)
for _, lo := range ta.bookedOrders {
if lo.Sell {
sells = append(sells, lo.ID())
} else {
buys = append(buys, lo.ID())
}
}
ta.bookedOrders = nil
return N, nil
return
}
func (ta *TArchivist) ActiveOrderCoins(base, quote uint32) (baseCoins, quoteCoins map[order.OrderID][]order.CoinID, err error) {
return make(map[order.OrderID][]order.CoinID), make(map[order.OrderID][]order.CoinID), nil
Expand Down

0 comments on commit 19a9e77

Please sign in to comment.