diff --git a/dex/msgjson/types.go b/dex/msgjson/types.go index 3102f894ac..bb38754b62 100644 --- a/dex/msgjson/types.go +++ b/dex/msgjson/types.go @@ -218,7 +218,7 @@ const ( Notification // 3 ) -// String satisfies the Stringer interface for tranlating the MessageType code +// String satisfies the Stringer interface for translating the MessageType code // into a description, primarily for logging. func (mt MessageType) String() string { switch mt { diff --git a/server/admin/server.go b/server/admin/server.go index f02f08cf66..d4b47c3a0e 100644 --- a/server/admin/server.go +++ b/server/admin/server.go @@ -36,7 +36,7 @@ var ( log slog.Logger ) -// SvrCore is satisfied by core.Core. +// SvrCore is satisfied by server/dex.DEX. type SvrCore interface { ConfigMsg() json.RawMessage MarketRunning(mktName string) (found, running bool) diff --git a/server/db/driver/pg/internal/orders.go b/server/db/driver/pg/internal/orders.go index 241a141127..f4b0d8df1e 100644 --- a/server/db/driver/pg/internal/orders.go +++ b/server/db/driver/pg/internal/orders.go @@ -149,7 +149,7 @@ const ( ) INSERT INTO %s -- archived orders table for market X SELECT * FROM moved - RETURNING oid, account_id;` + RETURNING oid, sell, account_id;` // CreateCancelOrdersTable creates a table specified via the %s printf // specifier for cancel orders. diff --git a/server/db/driver/pg/orders.go b/server/db/driver/pg/orders.go index 6714ef9626..e7cd777c80 100644 --- a/server/db/driver/pg/orders.go +++ b/server/db/driver/pg/orders.go @@ -208,10 +208,11 @@ func makePseudoCancel(target order.OrderID, user account.AccountID, base, quote } // FlushBook revokes all booked orders for a market. -func (a *Archiver) FlushBook(base, quote uint32) (int, error) { - marketSchema, err := a.marketSchema(base, quote) +func (a *Archiver) FlushBook(base, quote uint32) (sellsRemoved, buysRemoved []order.OrderID, err error) { + var marketSchema string + marketSchema, err = a.marketSchema(base, quote) if err != nil { - return 0, err + return } // Booked orders (active) are made revoked (archived). @@ -220,39 +221,49 @@ func (a *Archiver) FlushBook(base, quote uint32) (int, error) { timeStamp := time.Now().Truncate(time.Millisecond).UTC() - dbTx, err := a.db.Begin() + var dbTx *sql.Tx + dbTx, err = a.db.Begin() if err != nil { - return 0, fmt.Errorf("failed to begin database transaction: %v", err) + err = fmt.Errorf("failed to begin database transaction: %v", err) + return } fail := func() { + sellsRemoved, buysRemoved = nil, nil a.fatalBackendErr(err) _ = dbTx.Rollback() } // Changed all booked orders to revoked. stmt := fmt.Sprintf(internal.PurgeBook, srcTableName, orderStatusRevoked, dstTableName) - rows, err := dbTx.Query(stmt, orderStatusBooked) + var rows *sql.Rows + rows, err = dbTx.Query(stmt, orderStatusBooked) if err != nil { fail() - return 0, err + return } var cos []*order.CancelOrder for rows.Next() { var oid order.OrderID + var sell bool var aid account.AccountID - if err = rows.Scan(&oid, &aid); err != nil { + if err = rows.Scan(&oid, &sell, &aid); err != nil { rows.Close() fail() - return 0, err + return } cos = append(cos, makePseudoCancel(oid, aid, base, quote, timeStamp)) + if sell { + sellsRemoved = append(sellsRemoved, oid) + } else { + buysRemoved = append(buysRemoved, oid) + } } if err = rows.Err(); err != nil { fail() - return 0, err + return } // Insert the pseudo-cancel orders. @@ -263,16 +274,18 @@ func (a *Archiver) FlushBook(base, quote uint32) (int, error) { co.ServerTime, co.Commit, co.TargetOrderID, orderStatusRevoked, 0, 0) if err != nil { fail() - return 0, fmt.Errorf("failed to store pseudo-cancel order: %v", err) + err = fmt.Errorf("failed to store pseudo-cancel order: %v", err) + return } } if err = dbTx.Commit(); err != nil { fail() - return 0, fmt.Errorf("failed to commit transaction: %v", err) + err = fmt.Errorf("failed to commit transaction: %v", err) + return } - return len(cos), nil + return } // BookOrders retrieves all booked orders (with order status booked) for the diff --git a/server/db/driver/pg/orders_online_test.go b/server/db/driver/pg/orders_online_test.go index b5eb831323..286950656c 100644 --- a/server/db/driver/pg/orders_online_test.go +++ b/server/db/driver/pg/orders_online_test.go @@ -388,12 +388,18 @@ func TestFlushBook(t *testing.T) { t.Fatalf("StoreOrder failed: %v", err) } - numFlushed, err := archie.FlushBook(lo.BaseAsset, lo.QuoteAsset) + sellsRemoved, buysRemoved, err := archie.FlushBook(lo.BaseAsset, lo.QuoteAsset) if err != nil { t.Fatalf("FlushBook failed: %v", err) } - if numFlushed != 1 { - t.Fatalf("flushed %d book order, expected 1", numFlushed) + if len(sellsRemoved) != 0 { + t.Fatalf("flushed %d book sell orders, expected 0", len(sellsRemoved)) + } + if len(buysRemoved) != 1 { + t.Fatalf("flushed %d book buy orders, expected 1", len(buysRemoved)) + } + if buysRemoved[0] != lo.ID() { + t.Errorf("flushed sell order has ID %v, expected %v", buysRemoved[0], lo.ID()) } // Check for new status of the order. diff --git a/server/db/interface.go b/server/db/interface.go index 940c2770a6..08ef84d1fa 100644 --- a/server/db/interface.go +++ b/server/db/interface.go @@ -83,7 +83,7 @@ type OrderArchiver interface { BookOrders(base, quote uint32) ([]*order.LimitOrder, error) // FlushBook revokes all booked orders for a market. - FlushBook(base, quote uint32) (int, error) + FlushBook(base, quote uint32) (sellsRemoved, buysRemoved []order.OrderID, err error) // ActiveOrderCoins retrieves a CoinID slice for each active order. ActiveOrderCoins(base, quote uint32) (baseCoins, quoteCoins map[order.OrderID][]order.CoinID, err error) diff --git a/server/market/market.go b/server/market/market.go index 5a60ab490d..23f8340c5e 100644 --- a/server/market/market.go +++ b/server/market/market.go @@ -478,20 +478,29 @@ func (m *Market) PurgeBook() { defer m.bookMtx.Unlock() // Revoke all booked orders in the DB. - N, err := m.storage.FlushBook(m.marketInfo.Base, m.marketInfo.Quote) + sellsRemoved, buysRemoved, err := m.storage.FlushBook(m.marketInfo.Base, m.marketInfo.Quote) if err != nil { log.Errorf("Failed to flush book for market %s: %v", m.marketInfo.Name, err) } else { - log.Infof("Flushed %d booked orders for market %s", N, m.marketInfo.Name) + log.Infof("Flushed %d sell orders and %d buy orders from market %q book", + len(sellsRemoved), len(buysRemoved), m.marketInfo.Name) // Clear the in-memory order book to match the DB. m.book.Clear() - // Unlock all coins. TODO: only unlock previously booked order coins, do - // not include coins that might belong to orders still in epoch status. - // This won't matter if the market is suspended, but it does does if - // PurgeBook is used while the market is still accepting new orders and - // processing epochs. - m.coinLockerBase.UnlockAll() - m.coinLockerQuote.UnlockAll() + // Unlock coins for removed orders. + + // TODO: only unlock previously booked order coins, do not include coins + // that might belong to orders still in epoch status. This won't matter + // if the market is suspended, but it does if PurgeBook is used while + // the market is still accepting new orders and processing epochs. + + // Unlock base asset coins locked by sell orders. + for i := range sellsRemoved { + m.coinLockerBase.UnlockOrderCoins(sellsRemoved[i]) + } + // Unlock quote asset coins locked by buy orders. + for i := range buysRemoved { + m.coinLockerQuote.UnlockOrderCoins(buysRemoved[i]) + } } } @@ -513,7 +522,7 @@ func (m *Market) Run(ctx context.Context) { // Drain the order router of incoming orders that made it in after the // main loop broke and before flagging the market stopped. Do this in a // goroutine because the market is flagged as stopped under runMtx lock - // in this defer and their is a risk of deadlock in SubmitOrderAsync + // in this defer and there is a risk of deadlock in SubmitOrderAsync // that sends under runMtx lock as well. wgFeeds.Add(1) go func() { diff --git a/server/market/market_test.go b/server/market/market_test.go index bb366dad41..59ad6cb24c 100644 --- a/server/market/market_test.go +++ b/server/market/market_test.go @@ -46,12 +46,18 @@ func (ta *TArchivist) BookOrders(base, quote uint32) ([]*order.LimitOrder, error defer ta.mtx.Unlock() return ta.bookedOrders, nil } -func (ta *TArchivist) FlushBook(base, quote uint32) (int, error) { +func (ta *TArchivist) FlushBook(base, quote uint32) (sells, buys []order.OrderID, err error) { ta.mtx.Lock() defer ta.mtx.Unlock() - N := len(ta.bookedOrders) + for _, lo := range ta.bookedOrders { + if lo.Sell { + sells = append(sells, lo.ID()) + } else { + buys = append(buys, lo.ID()) + } + } ta.bookedOrders = nil - return N, nil + return } func (ta *TArchivist) ActiveOrderCoins(base, quote uint32) (baseCoins, quoteCoins map[order.OrderID][]order.CoinID, err error) { return make(map[order.OrderID][]order.CoinID), make(map[order.OrderID][]order.CoinID), nil