Skip to content

Commit

Permalink
leave OrderPQ alone
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 committed Dec 4, 2021
1 parent 80ac931 commit a367380
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 154 deletions.
4 changes: 2 additions & 2 deletions server/book/accounts.go
Expand Up @@ -45,10 +45,10 @@ func newAccountTracker(tracking AccountTracking) *accountTracker {
// not need tracking.
var base, quote map[string]map[order.OrderID]*order.LimitOrder
if tracking.base() {
base = make(map[string]map[order.OrderID]*order.LimitOrder)
base = make(map[string]map[order.OrderID]*order.LimitOrder, initBookHalfCapacity)
}
if tracking.quote() {
quote = make(map[string]map[order.OrderID]*order.LimitOrder)
quote = make(map[string]map[order.OrderID]*order.LimitOrder, initBookHalfCapacity)
}
return &accountTracker{
tracking: tracking,
Expand Down
43 changes: 31 additions & 12 deletions server/book/book.go
Expand Up @@ -24,28 +24,31 @@ const (
// removal of orders.
type Book struct {
mtx sync.RWMutex
acctTracking AccountTracking
lotSize uint64
buys *OrderPQ
sells *OrderPQ
acctTracking AccountTracking
acctTracker *accountTracker
}

// New creates a new order book with the given lot size.
func New(lotSize uint64, acctTracking AccountTracking) *Book {
return &Book{
lotSize: lotSize,
buys: NewMaxOrderPQ(initBookHalfCapacity),
sells: NewMinOrderPQ(initBookHalfCapacity),
acctTracking: acctTracking,
buys: NewMaxOrderPQ(initBookHalfCapacity, acctTracking),
sells: NewMinOrderPQ(initBookHalfCapacity, acctTracking),
acctTracker: newAccountTracker(acctTracking),
}
}

// Clear reset the order book with configured capacity.
func (b *Book) Clear() {
b.mtx.Lock()
b.buys, b.sells = nil, nil
b.buys = NewMaxOrderPQ(initBookHalfCapacity, b.acctTracking)
b.sells = NewMinOrderPQ(initBookHalfCapacity, b.acctTracking)
b.buys = NewMaxOrderPQ(initBookHalfCapacity)
b.sells = NewMinOrderPQ(initBookHalfCapacity)
b.acctTracker = newAccountTracker(b.acctTracking)
b.mtx.Unlock()
}

Expand Down Expand Up @@ -97,19 +100,29 @@ func (b *Book) Insert(o *order.LimitOrder) bool {
b.mtx.Lock()
defer b.mtx.Unlock()
if o.Sell {
return b.sells.Insert(o)
if b.sells.Insert(o) {
b.acctTracker.add(o)
return true
}
return false
}
if b.buys.Insert(o) {
b.acctTracker.add(o)
return true
}
return b.buys.Insert(o)
return false
}

// Remove attempts to remove the order with the given OrderID from the book.
func (b *Book) Remove(oid order.OrderID) (*order.LimitOrder, bool) {
b.mtx.Lock()
defer b.mtx.Unlock()
if removed, ok := b.sells.RemoveOrderID(oid); ok {
b.acctTracker.remove(removed)
return removed, true
}
if removed, ok := b.buys.RemoveOrderID(oid); ok {
b.acctTracker.remove(removed)
return removed, true
}
return nil, false
Expand All @@ -118,7 +131,15 @@ func (b *Book) Remove(oid order.OrderID) (*order.LimitOrder, bool) {
// RemoveUserOrders removes all orders from the book that belong to a user. The
// removed buy and sell orders are returned.
func (b *Book) RemoveUserOrders(user account.AccountID) (removedBuys, removedSells []*order.LimitOrder) {
return b.buys.RemoveUserOrders(user), b.sells.RemoveUserOrders(user)
removedBuys = b.buys.RemoveUserOrders(user)
for _, lo := range removedBuys {
b.acctTracker.remove(lo)
}
removedSells = b.sells.RemoveUserOrders(user)
for _, lo := range removedSells {
b.acctTracker.remove(lo)
}
return
}

// HaveOrder checks if an order is in either the buy or sell side of the book.
Expand Down Expand Up @@ -191,15 +212,13 @@ func (b *Book) UnfilledUserSells(user account.AccountID) []*order.LimitOrder {
func (b *Book) IterateBaseAccount(acctAddr string, f func(lo *order.LimitOrder)) {
b.mtx.RLock()
defer b.mtx.RUnlock()
b.sells.IterateBaseAccount(acctAddr, f)
b.buys.IterateBaseAccount(acctAddr, f)
b.acctTracker.iterateBaseAccount(acctAddr, f)
}

// IterateQuoteAccount calls the provided function for every tracked order with
// a quote asset corresponding to the specified account address.
func (b *Book) IterateQuoteAccount(acctAddr string, f func(lo *order.LimitOrder)) {
b.mtx.RLock()
defer b.mtx.RUnlock()
b.sells.IterateQuoteAccount(acctAddr, f)
b.buys.IterateQuoteAccount(acctAddr, f)
b.acctTracker.iterateQuoteAccount(acctAddr, f)
}
75 changes: 68 additions & 7 deletions server/book/book_test.go
Expand Up @@ -32,10 +32,6 @@ func startLogger() {
}

func newLimitOrder(sell bool, rate, quantityLots uint64, force order.TimeInForce, timeOffset int64) *order.LimitOrder {
addr := "DcqXswjTPnUcd4FRCkX4vRJxmVtfgGVa5ui"
if sell {
addr = "149RQGLaHf2gGiL4NXZdH7aA8nYEuLLrgm"
}
return &order.LimitOrder{
P: order.Prefix{
AccountID: acct0,
Expand All @@ -46,10 +42,10 @@ func newLimitOrder(sell bool, rate, quantityLots uint64, force order.TimeInForce
ServerTime: time.Unix(1566497656+timeOffset, 0),
},
T: order.Trade{
Coins: []order.CoinID{},
Coins: []order.CoinID{[]byte(newFakeAddr())},
Sell: sell,
Quantity: quantityLots * LotSize,
Address: addr,
Address: newFakeAddr(),
},
Rate: rate,
Force: force,
Expand Down Expand Up @@ -91,7 +87,7 @@ var (
func newBook(t *testing.T) *Book {
resetMakers()

b := New(LotSize, 0)
b := New(LotSize, AccountTrackingBase|AccountTrackingQuote)

for _, o := range bookBuyOrders {
if ok := b.Insert(o); !ok {
Expand Down Expand Up @@ -265,3 +261,68 @@ func TestBook(t *testing.T) {
t.Errorf("buy side was not empty after Clear")
}
}

func TestAccountTracking(t *testing.T) {
firstSell := bookSellOrders[len(bookSellOrders)-1]
allOrders := append(bookBuyOrders, bookSellOrders...)

// Max oriented queue
b := newBook(t)
for _, lo := range allOrders {
b.Insert(lo)
}

if len(b.acctTracker.base) == 0 {
t.Fatalf("base asset not tracked")
}

if len(b.acctTracker.quote) == 0 {
t.Fatalf("quote asset not tracked")
}

// Check each order and make sure it's where we expect.
for _, ord := range allOrders {
// they are all buy orders
baseAccount := ord.BaseAccount()
ords, found := b.acctTracker.base[baseAccount]
if !found {
t.Fatalf("base order account not found")
}
_, found = ords[ord.ID()]
if !found {
t.Fatalf("base order not found")
}

quoteAccount := ord.QuoteAccount()
ords, found = b.acctTracker.quote[quoteAccount]
if !found {
t.Fatalf("quote order account not found")
}
_, found = ords[ord.ID()]
if !found {
t.Fatalf("quote order not found")
}
}

// Check that our first seller has two orders.
if len(b.acctTracker.base[firstSell.BaseAccount()]) != 2 {
t.Fatalf("didn't track two base orders for first user")
}

if len(b.acctTracker.quote[firstSell.QuoteAccount()]) != 2 {
t.Fatalf("didn't track two quote orders for first user")
}

// Remove them all.
for _, lo := range allOrders {
b.Remove(lo.ID())
}

if len(b.acctTracker.base) != 0 {
t.Fatalf("base asset not cleared")
}

if len(b.acctTracker.quote) != 0 {
t.Fatalf("quote asset not cleared")
}
}
58 changes: 17 additions & 41 deletions server/book/orderpq.go
Expand Up @@ -25,13 +25,12 @@ type orderHeap []*orderEntry
// price rate. A max-oriented queue with highest rates on top is constructed via
// NewMaxOrderPQ, while a min-oriented queue is constructed via NewMinOrderPQ.
type OrderPQ struct {
mtx sync.RWMutex
oh orderHeap
capacity uint32
lessFn func(bi, bj *order.LimitOrder) bool
orders map[order.OrderID]*orderEntry
userOrders map[account.AccountID]map[order.OrderID]*order.LimitOrder
acctTracker *accountTracker
mtx sync.RWMutex
oh orderHeap
capacity uint32
lessFn func(bi, bj *order.LimitOrder) bool
orders map[order.OrderID]*orderEntry
userOrders map[account.AccountID]map[order.OrderID]*order.LimitOrder
}

// Copy makes a deep copy of the OrderPQ. The orders are the same; each
Expand All @@ -56,7 +55,6 @@ func (pq *OrderPQ) realloc(newCap uint32) {
pq.orders = newPQ.orders
pq.oh = newPQ.oh
pq.userOrders = newPQ.userOrders
pq.acctTracker = newPQ.acctTracker
}

// Cap returns the current capacity of the OrderPQ.
Expand All @@ -78,8 +76,6 @@ func (pq *OrderPQ) push(oe *orderEntry) {
} else {
pq.userOrders[lo.AccountID] = map[order.OrderID]*order.LimitOrder{oid: lo}
}
// Track accounts.
pq.acctTracker.add(oe.order)
}

// copy makes a deep copy of the OrderPQ. The orders are the same; each
Expand All @@ -91,7 +87,7 @@ func (pq *OrderPQ) copy(newCap uint32) *OrderPQ {
}
// Initialize the new OrderPQ.
// TODO: Pre-allocate the accountTracker somehow.
newPQ := newOrderPQ(newCap, pq.lessFn, pq.acctTracker.tracking)
newPQ := newOrderPQ(newCap, pq.lessFn)
newPQ.userOrders = make(map[account.AccountID]map[order.OrderID]*order.LimitOrder, len(pq.userOrders))
for aid, uos := range pq.userOrders {
newPQ.userOrders[aid] = make(map[order.OrderID]*order.LimitOrder, len(uos)) // actual *LimitOrders copied in push
Expand Down Expand Up @@ -126,22 +122,6 @@ func (pq *OrderPQ) UnfilledForUser(user account.AccountID) []*order.LimitOrder {
return orders
}

// IterateBaseAccount calls the provided function for every tracked order with
// a base asset corresponding to the specified account address.
func (pq *OrderPQ) IterateBaseAccount(acctAddr string, f func(*order.LimitOrder)) {
pq.mtx.RLock()
pq.acctTracker.iterateBaseAccount(acctAddr, f)
pq.mtx.RUnlock()
}

// IterateQuoteAccount calls the provided function for every tracked order with
// a quote asset corresponding to the specified account address.
func (pq *OrderPQ) IterateQuoteAccount(acctAddr string, f func(*order.LimitOrder)) {
pq.mtx.RLock()
pq.acctTracker.iterateQuoteAccount(acctAddr, f)
pq.mtx.RUnlock()
}

// Orders copies all orders, sorted with the lessFn. The OrderPQ is unmodified.
func (pq *OrderPQ) Orders() []*order.LimitOrder {
// Deep copy the orders.
Expand Down Expand Up @@ -194,25 +174,24 @@ func (pq *OrderPQ) ExtractN(count int) []*order.LimitOrder {
// NewMinOrderPQ is the constructor for OrderPQ that initializes an empty heap
// with the given capacity, and sets the default LessFn for a min heap. Use
// OrderPQ.SetLessFn to redefine the comparator.
func NewMinOrderPQ(capacity uint32, acctTracking AccountTracking) *OrderPQ {
return newOrderPQ(capacity, LessByPriceThenTime, acctTracking)
func NewMinOrderPQ(capacity uint32) *OrderPQ {
return newOrderPQ(capacity, LessByPriceThenTime)
}

// NewMaxOrderPQ is the constructor for OrderPQ that initializes an empty heap
// with the given capacity, and sets the default LessFn for a max heap. Use
// OrderPQ.SetLessFn to redefine the comparator.
func NewMaxOrderPQ(capacity uint32, acctTracking AccountTracking) *OrderPQ {
return newOrderPQ(capacity, GreaterByPriceThenTime, acctTracking)
func NewMaxOrderPQ(capacity uint32) *OrderPQ {
return newOrderPQ(capacity, GreaterByPriceThenTime)
}

func newOrderPQ(cap uint32, lessFn func(bi, bj *order.LimitOrder) bool, acctTracking AccountTracking) *OrderPQ {
func newOrderPQ(cap uint32, lessFn func(bi, bj *order.LimitOrder) bool) *OrderPQ {
return &OrderPQ{
oh: make(orderHeap, 0, cap),
capacity: cap,
lessFn: lessFn,
orders: make(map[order.OrderID]*orderEntry, cap),
userOrders: make(map[account.AccountID]map[order.OrderID]*order.LimitOrder),
acctTracker: newAccountTracker(acctTracking),
oh: make(orderHeap, 0, cap),
capacity: cap,
lessFn: lessFn,
orders: make(map[order.OrderID]*orderEntry, cap),
userOrders: make(map[account.AccountID]map[order.OrderID]*order.LimitOrder),
}
}

Expand Down Expand Up @@ -309,7 +288,6 @@ func (pq *OrderPQ) Pop() interface{} {
} else {
fmt.Printf("(*OrderPQ).Pop: no userOrders for %v found when popping order %v!", user, oid)
}
pq.acctTracker.remove(lo)

// If the heap has shrunk well below capacity, realloc smaller.
if pq.capacity > deallocThresh {
Expand Down Expand Up @@ -415,7 +393,6 @@ func (pq *OrderPQ) Reset(orders []*order.LimitOrder) {
pq.oh = make([]*orderEntry, 0, len(orders))
pq.orders = make(map[order.OrderID]*orderEntry, len(pq.oh))
pq.userOrders = make(map[account.AccountID]map[order.OrderID]*order.LimitOrder)
pq.acctTracker = newAccountTracker(pq.acctTracker.tracking)
for i, lo := range orders {
entry := &orderEntry{
order: lo,
Expand Down Expand Up @@ -493,7 +470,6 @@ func (pq *OrderPQ) RemoveUserOrders(user account.AccountID) (removed []*order.Li
removed = make([]*order.LimitOrder, 0, len(uos))
for oid, lo := range uos {
pq.removeOrder(pq.orders[oid])
pq.acctTracker.remove(lo)
removed = append(removed, lo)
}
return
Expand Down

0 comments on commit a367380

Please sign in to comment.