/
prices_subscriber.go
138 lines (109 loc) · 3.9 KB
/
prices_subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package bybit
import (
"context"
"fmt"
exchanges "github.com/aulaleslie/trade-exchanges"
"github.com/aulaleslie/trade-exchanges/utils"
"github.com/hirokisan/bybit/v2"
"github.com/pkg/errors"
"go.uber.org/zap"
)
func SubscribeToPrices(ctx context.Context, wsClient *bybit.WebSocketClient, lg *zap.Logger, symbol string) (<-chan exchanges.PriceEvent, error) {
svc, err := wsClient.V5().Public(bybit.CategoryV5Spot)
if err != nil {
return nil, errors.Wrap(err, "unable to create V5 service")
}
out := make(chan exchanges.PriceEvent, 100)
_, err = svc.SubscribeTicker(
bybit.V5WebsocketPublicTickerParamKey{
Symbol: bybit.SymbolV5(symbol),
},
func(response bybit.V5WebsocketPublicTickerResponse) error {
priceData := response.Data.Spot
if priceData.LastPrice != "" {
price := utils.FromString(priceData.LastPrice)
out <- exchanges.PriceEvent{
Payload: price}
}
return err
})
if err != nil {
out <- exchanges.PriceEvent{DisconnectedWithErr: fmt.Errorf("connection error %v", err)}
defer close(out)
// handle registration error
lg.Sugar().Info("registration connection", err.Error())
}
errHandler := func(isWebsocketClosed bool, err error) {
// Connection issue (timeout, etc.).
// TODO: At this point, the connection is dead and you must handle the reconnection yourself
lg.Sugar().Info("err handler called", err.Error())
}
go svc.Start(context.Background(), errHandler)
return out, nil
}
func SubscribeToPricesInverse(ctx context.Context, wsClient *bybit.WebSocketClient, lg *zap.Logger, symbol string) (<-chan exchanges.PriceEvent, error) {
svc, err := wsClient.V5().Public(bybit.CategoryV5Inverse)
if err != nil {
return nil, errors.Wrap(err, "unable to create V5 service")
}
out := make(chan exchanges.PriceEvent, 100)
_, err = svc.SubscribeTicker(
bybit.V5WebsocketPublicTickerParamKey{
Symbol: bybit.SymbolV5(symbol),
},
func(response bybit.V5WebsocketPublicTickerResponse) error {
priceData := response.Data.LinearInverse
if priceData.LastPrice != "" {
price := utils.FromString(priceData.LastPrice)
out <- exchanges.PriceEvent{
Payload: price}
}
return err
})
if err != nil {
out <- exchanges.PriceEvent{DisconnectedWithErr: fmt.Errorf("connection error %v", err)}
defer close(out)
// handle registration error
lg.Sugar().Info("registration connection", err.Error())
}
errHandler := func(isWebsocketClosed bool, err error) {
// Connection issue (timeout, etc.).
// TODO: At this point, the connection is dead and you must handle the reconnection yourself
lg.Sugar().Info("err handler called", err.Error())
}
go svc.Start(context.Background(), errHandler)
return out, nil
}
func SubscribeToPricesLinear(ctx context.Context, wsClient *bybit.WebSocketClient, lg *zap.Logger, symbol string) (<-chan exchanges.PriceEvent, error) {
svc, err := wsClient.V5().Public(bybit.CategoryV5Linear)
if err != nil {
return nil, errors.Wrap(err, "unable to create V5 service")
}
out := make(chan exchanges.PriceEvent, 100)
_, err = svc.SubscribeTicker(
bybit.V5WebsocketPublicTickerParamKey{
Symbol: bybit.SymbolV5(symbol),
},
func(response bybit.V5WebsocketPublicTickerResponse) error {
priceData := response.Data.LinearInverse
if priceData.LastPrice != "" {
price := utils.FromString(priceData.LastPrice)
out <- exchanges.PriceEvent{
Payload: price}
}
return err
})
if err != nil {
out <- exchanges.PriceEvent{DisconnectedWithErr: fmt.Errorf("connection error %v", err)}
defer close(out)
// handle registration error
lg.Sugar().Info("registration connection", err.Error())
}
errHandler := func(isWebsocketClosed bool, err error) {
// Connection issue (timeout, etc.).
// TODO: At this point, the connection is dead and you must handle the reconnection yourself
lg.Sugar().Info("err handler called", err.Error())
}
go svc.Start(context.Background(), errHandler)
return out, nil
}