-
-
Notifications
You must be signed in to change notification settings - Fork 274
/
activeorderbook.go
201 lines (162 loc) · 4.9 KB
/
activeorderbook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package bbgo
import (
"context"
"encoding/json"
"time"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
)
const CancelOrderWaitTime = 20 * time.Millisecond
// ActiveOrderBook manages the local active order books.
//go:generate callbackgen -type ActiveOrderBook
type ActiveOrderBook struct {
Symbol string
orders *types.SyncOrderMap
filledCallbacks []func(o types.Order)
}
func NewActiveOrderBook(symbol string) *ActiveOrderBook {
return &ActiveOrderBook{
Symbol: symbol,
orders: types.NewSyncOrderMap(),
}
}
func (b *ActiveOrderBook) MarshalJSON() ([]byte, error) {
orders := b.Backup()
return json.Marshal(orders)
}
func (b *ActiveOrderBook) Backup() []types.SubmitOrder {
return b.orders.Backup()
}
func (b *ActiveOrderBook) BindStream(stream types.Stream) {
stream.OnOrderUpdate(b.orderUpdateHandler)
}
func (b *ActiveOrderBook) waitAllClear(ctx context.Context, waitTime, timeout time.Duration) (bool, error) {
numOfOrders := b.NumOfOrders()
clear := numOfOrders == 0
if clear {
return clear, nil
}
timeoutC := time.After(timeout)
for {
time.Sleep(waitTime)
numOfOrders = b.NumOfOrders()
clear = numOfOrders == 0
select {
case <-timeoutC:
return clear, nil
case <-ctx.Done():
return clear, ctx.Err()
default:
if clear {
return clear, nil
}
}
}
}
// GracefulCancel cancels the active orders gracefully
func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange) error {
// optimize order cancel for back-testing
if IsBackTesting {
orders := b.Orders()
return ex.CancelOrders(context.Background(), orders...)
}
log.Debugf("[ActiveOrderBook] gracefully cancelling %s orders...", b.Symbol)
waitTime := CancelOrderWaitTime
startTime := time.Now()
// ensure every order is cancelled
for {
orders := b.Orders()
// Some orders in the variable are not created on the server side yet,
// If we cancel these orders directly, we will get an unsent order error
// We wait here for a while for server to create these orders.
// time.Sleep(SentOrderWaitTime)
// since ctx might be canceled, we should use background context here
if err := ex.CancelOrders(context.Background(), orders...); err != nil {
log.WithError(err).Errorf("[ActiveOrderBook] can not cancel %s orders", b.Symbol)
}
log.Debugf("[ActiveOrderBook] waiting %s for %s orders to be cancelled...", waitTime, b.Symbol)
clear, err := b.waitAllClear(ctx, waitTime, 5*time.Second)
if clear || err != nil {
break
}
log.Warnf("[ActiveOrderBook] %d %s orders are not cancelled yet:", b.NumOfOrders(), b.Symbol)
b.Print()
// verify the current open orders via the RESTful API
log.Warnf("[ActiveOrderBook] using REStful API to verify active orders...")
orders = b.Orders()
var symbols = map[string]struct{}{}
for _, order := range orders {
symbols[order.Symbol] = struct{}{}
}
for symbol := range symbols {
openOrders, err := ex.QueryOpenOrders(ctx, symbol)
if err != nil {
log.WithError(err).Errorf("can not query %s open orders", symbol)
continue
}
openOrderStore := NewOrderStore(symbol)
openOrderStore.Add(openOrders...)
for _, o := range orders {
// if it's not on the order book (open orders), we should remove it from our local side
if !openOrderStore.Exists(o.OrderID) {
b.Remove(o)
}
}
}
}
log.Debugf("[ActiveOrderBook] all %s orders are cancelled successfully in %s", b.Symbol, time.Since(startTime))
return nil
}
func (b *ActiveOrderBook) orderUpdateHandler(order types.Order) {
hasSymbol := len(b.Symbol) > 0
if hasSymbol && order.Symbol != b.Symbol {
return
}
switch order.Status {
case types.OrderStatusFilled:
// make sure we have the order and we remove it
if b.Remove(order) {
b.EmitFilled(order)
}
case types.OrderStatusPartiallyFilled, types.OrderStatusNew:
b.Update(order)
case types.OrderStatusCanceled, types.OrderStatusRejected:
log.Debugf("[ActiveOrderBook] order status %s, removing order %s", order.Status, order)
b.Remove(order)
default:
log.Warnf("unhandled order status: %s", order.Status)
}
}
func (b *ActiveOrderBook) Print() {
for _, o := range b.orders.Orders() {
log.Infof("%s", o)
}
}
func (b *ActiveOrderBook) Update(orders ...types.Order) {
hasSymbol := len(b.Symbol) > 0
for _, order := range orders {
if hasSymbol && b.Symbol == order.Symbol {
b.orders.Update(order)
}
}
}
func (b *ActiveOrderBook) Add(orders ...types.Order) {
hasSymbol := len(b.Symbol) > 0
for _, order := range orders {
if hasSymbol && b.Symbol == order.Symbol {
b.orders.Add(order)
}
}
}
func (b *ActiveOrderBook) Exists(order types.Order) bool {
return b.orders.Exists(order.OrderID)
}
func (b *ActiveOrderBook) Remove(order types.Order) bool {
return b.orders.Remove(order.OrderID)
}
func (b *ActiveOrderBook) NumOfOrders() int {
return b.orders.Len()
}
func (b *ActiveOrderBook) Orders() types.OrderSlice {
return b.orders.Orders()
}