-
-
Notifications
You must be signed in to change notification settings - Fork 273
/
tradecollector.go
231 lines (189 loc) · 5.7 KB
/
tradecollector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package bbgo
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types"
)
//go:generate callbackgen -type TradeCollector
type TradeCollector struct {
Symbol string
orderSig sigchan.Chan
tradeStore *TradeStore
tradeC chan types.Trade
position *types.Position
orderStore *OrderStore
doneTrades map[types.TradeKey]struct{}
mu sync.Mutex
recoverCallbacks []func(trade types.Trade)
tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)
positionUpdateCallbacks []func(position *types.Position)
profitCallbacks []func(trade types.Trade, profit *types.Profit)
}
func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector {
return &TradeCollector{
Symbol: symbol,
orderSig: sigchan.New(1),
tradeC: make(chan types.Trade, 100),
tradeStore: NewTradeStore(),
doneTrades: make(map[types.TradeKey]struct{}),
position: position,
orderStore: orderStore,
}
}
// OrderStore returns the order store used by the trade collector
func (c *TradeCollector) OrderStore() *OrderStore {
return c.orderStore
}
// Position returns the position used by the trade collector
func (c *TradeCollector) Position() *types.Position {
return c.position
}
func (c *TradeCollector) SetPosition(position *types.Position) {
c.position = position
}
// QueueTrade sends the trade object to the trade channel,
// so that the goroutine can receive the trade and process in the background.
func (c *TradeCollector) QueueTrade(trade types.Trade) {
c.tradeC <- trade
}
// BindStreamForBackground bind the stream callback for background processing
func (c *TradeCollector) BindStreamForBackground(stream types.Stream) {
stream.OnTradeUpdate(c.QueueTrade)
}
func (c *TradeCollector) BindStream(stream types.Stream) {
stream.OnTradeUpdate(func(trade types.Trade) {
c.ProcessTrade(trade)
})
}
// Emit triggers the trade processing (position update)
// If you sent order, and the order store is updated, you can call this method
// so that trades will be processed in the next round of the goroutine loop
func (c *TradeCollector) Emit() {
c.orderSig.Emit()
}
func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error {
trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &from,
})
if err != nil {
return err
}
for _, td := range trades {
log.Debugf("processing trade: %s", td.String())
if c.ProcessTrade(td) {
log.Infof("recovered trade: %s", td.String())
c.EmitRecover(td)
}
}
return nil
}
func (c *TradeCollector) setDone(key types.TradeKey) {
c.mu.Lock()
c.doneTrades[key] = struct{}{}
c.mu.Unlock()
}
// Process filters the received trades and see if there are orders matching the trades
// if we have the order in the order store, then the trade will be considered for the position.
// profit will also be calculated.
func (c *TradeCollector) Process() bool {
positionChanged := false
c.tradeStore.Filter(func(trade types.Trade) bool {
key := trade.Key()
c.mu.Lock()
defer c.mu.Unlock()
// if it's already done, remove the trade from the trade store
if _, done := c.doneTrades[key]; done {
return true
}
if c.orderStore.Exists(trade.OrderID) {
if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, &p)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
}
positionChanged = true
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
}
c.doneTrades[key] = struct{}{}
return true
}
return false
})
if positionChanged && c.position != nil {
c.EmitPositionUpdate(c.position)
}
return positionChanged
}
// processTrade takes a trade and see if there is a matched order
// if the order is found, then we add the trade to the position
// return true when the given trade is added
// return false when the given trade is not added
func (c *TradeCollector) processTrade(trade types.Trade) bool {
c.mu.Lock()
defer c.mu.Unlock()
key := trade.Key()
// if it's already done, remove the trade from the trade store
if _, done := c.doneTrades[key]; done {
return false
}
if c.orderStore.Exists(trade.OrderID) {
if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, &p)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
}
c.EmitPositionUpdate(c.position)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
}
c.doneTrades[key] = struct{}{}
return true
}
return false
}
// return true when the given trade is added
// return false when the given trade is not added
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
key := trade.Key()
// if it's already done, remove the trade from the trade store
if _, done := c.doneTrades[key]; done {
return false
}
if c.processTrade(trade) {
return true
}
c.tradeStore.Add(trade)
return false
}
// Run is a goroutine executed in the background
// Do not use this function if you need back-testing
func (c *TradeCollector) Run(ctx context.Context) {
var ticker = time.NewTicker(3 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.Process()
case <-c.orderSig:
c.Process()
case trade := <-c.tradeC:
c.ProcessTrade(trade)
}
}
}