/
ws_public.go
159 lines (146 loc) · 5.15 KB
/
ws_public.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package myokxapi
type WsBooksType string
const (
WS_BOOKS_SNAPSHOT_1_10MS WsBooksType = "bbo-tbt" //首次推1档快照数据,以后定量推送,每10毫秒当1档快照数据有变化推送一次1档数据
WS_BOOKS_SNAPSHOT_5_100MS WsBooksType = "books5" //首次推5档快照数据,以后定量推送,每100毫秒当5档快照数据有变化推送一次5档数据
WS_BOOKS_UPDATE_400_100MS WsBooksType = "books" //首次推400档快照数据,以后增量推送,每100毫秒推送一次变化的数据
WS_BOOKS_UPDATE_400_10MS WsBooksType = "books-l2-tbt" //首次推400档快照数据,以后增量推送,每10毫秒推送一次变化的数据
WS_BOOKS_UPDATE_50_10MS WsBooksType = "books50-l2-tbt" //首次推50档快照数据,以后增量推送,每10毫秒推送一次变化的数据
)
func (wsBooksType WsBooksType) String() string {
return string(wsBooksType)
}
// 批量订阅深度 如: ["BTC-USDT","ETH-USDT"], WS_BOOKS_SNAPSHOT_5_100MS
func (ws *PublicWsStreamClient) SubscribeBooksMultiple(instIds []string, wsBooksType WsBooksType) (*Subscription[WsBooks], error) {
args := []WsSubscribeArg{}
for _, s := range instIds {
arg := getBooksSubscribeArg(s, wsBooksType)
args = append(args, arg)
}
doSub, err := subscribe[WsActionResult](&ws.WsStreamClient, SUBSCRIBE, args)
if err != nil {
return nil, err
}
err = ws.catchSubscribeResult(doSub)
if err != nil {
return nil, err
}
log.Infof("SubscribeBooks Success: args:%v", doSub.Args)
sub := &Subscription[WsBooks]{
SubId: doSub.SubId,
Op: SUBSCRIBE,
Args: doSub.Args,
resultChan: make(chan WsBooks, 50),
errChan: make(chan error),
closeChan: make(chan struct{}),
Ws: &ws.WsStreamClient,
}
for _, arg := range args {
keyData, _ := json.Marshal(&arg)
ws.booksSubMap.Store(string(keyData), sub)
}
return sub, nil
}
// 批量取消订阅深度 如: ["BTC-USDT","ETH-USDT"], WS_BOOKS_SNAPSHOT_5_100MS
func (ws *PublicWsStreamClient) UnSubscribeBooksMultiple(instIds []string, wsBooksType WsBooksType) error {
args := []WsSubscribeArg{}
for _, s := range instIds {
arg := getBooksSubscribeArg(s, wsBooksType)
args = append(args, arg)
}
doSub, err := subscribe[WsActionResult](&ws.WsStreamClient, UNSUBSCRIBE, args)
if err != nil {
return err
}
err = ws.catchSubscribeResult(doSub)
if err != nil {
return err
}
log.Infof("UnSubscribeBooks Success: args:%v", doSub.Args)
for _, arg := range args {
keyData, _ := json.Marshal(&arg)
ws.booksSubMap.Delete(string(keyData))
}
return nil
}
// 订阅单个深度 如: "BTC-USDT", WS_BOOKS_SNAPSHOT_5_100MS
func (ws *PublicWsStreamClient) SubscribeBooks(instIds string, wsBooksType WsBooksType) (*Subscription[WsBooks], error) {
return ws.SubscribeBooksMultiple([]string{instIds}, wsBooksType)
}
// 取消订阅单个深度 如: "BTC-USDT", WS_BOOKS_SNAPSHOT_5_100MS
func (ws *PublicWsStreamClient) UnSubscribeBooks(instIds string, wsBooksType WsBooksType) error {
return ws.UnSubscribeBooksMultiple([]string{instIds}, wsBooksType)
}
func getBooksSubscribeArg(instId string, wsBooksType WsBooksType) WsSubscribeArg {
return WsSubscribeArg{
Channel: wsBooksType.String(),
InstId: instId,
}
}
// 批量订阅交易 如: ["BTC-USDT","ETH-USDT"]
func (ws *PublicWsStreamClient) SubscribeTradesMultiple(instIds []string) (*Subscription[WsTrades], error) {
args := []WsSubscribeArg{}
for _, s := range instIds {
arg := getTradesSubscribeArg(s)
args = append(args, arg)
}
doSub, err := subscribe[WsActionResult](&ws.WsStreamClient, SUBSCRIBE, args)
if err != nil {
return nil, err
}
err = ws.catchSubscribeResult(doSub)
if err != nil {
return nil, err
}
log.Infof("SubscribeTrades Success: args:%v", doSub.Args)
sub := &Subscription[WsTrades]{
SubId: doSub.SubId,
Op: SUBSCRIBE,
Args: doSub.Args,
resultChan: make(chan WsTrades, 50),
errChan: make(chan error),
closeChan: make(chan struct{}),
Ws: &ws.WsStreamClient,
}
for _, arg := range args {
keyData, _ := json.Marshal(&arg)
ws.tradesSubMap.Store(string(keyData), sub)
}
return sub, nil
}
// 批量取消订阅交易 如: ["BTC-USDT","ETH-USDT"]
func (ws *PublicWsStreamClient) UnSubscribeTradesMultiple(instIds []string) error {
args := []WsSubscribeArg{}
for _, s := range instIds {
arg := getTradesSubscribeArg(s)
args = append(args, arg)
}
doSub, err := subscribe[WsActionResult](&ws.WsStreamClient, UNSUBSCRIBE, args)
if err != nil {
return err
}
err = ws.catchSubscribeResult(doSub)
if err != nil {
return err
}
log.Infof("UnSubscribeTrades Success: args:%v", doSub.Args)
for _, arg := range args {
keyData, _ := json.Marshal(&arg)
ws.tradesSubMap.Delete(string(keyData))
}
return nil
}
// 订阅单个交易
func (ws *PublicWsStreamClient) SubscribeTrades(instIds string) (*Subscription[WsTrades], error) {
return ws.SubscribeTradesMultiple([]string{instIds})
}
// 取消订阅单个交易
func (ws *PublicWsStreamClient) UnSubscribeTrades(instIds string) error {
return ws.UnSubscribeTradesMultiple([]string{instIds})
}
func getTradesSubscribeArg(instId string) WsSubscribeArg {
return WsSubscribeArg{
Channel: "trades",
InstId: instId,
}
}