/
ohlcv.go
136 lines (109 loc) · 3.46 KB
/
ohlcv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package ws
import (
"errors"
"sync"
)
var ohlcvSocket *OHLCVSocket
// OHLCVSocket holds the map of subscribtions subscribed to pair channels
// corresponding to the key/event they have subscribed to.
type OHLCVSocket struct {
subscriptions map[string]map[*Client]bool
subscriptionsList map[*Client][]string
mu sync.Mutex
}
func NewOHLCVSocket() *OHLCVSocket {
return &OHLCVSocket{
subscriptions: make(map[string]map[*Client]bool),
subscriptionsList: make(map[*Client][]string),
mu: sync.Mutex{},
}
}
// GetOHLCVSocket return singleton instance of PairSockets type struct
func GetOHLCVSocket() *OHLCVSocket {
if ohlcvSocket == nil {
ohlcvSocket = NewOHLCVSocket()
}
return ohlcvSocket
}
// Subscribe handles the registration of connection to get
// streaming data over the socker for any pair.
func (s *OHLCVSocket) Subscribe(channelID string, c *Client) error {
s.mu.Lock()
defer s.mu.Unlock()
if c == nil {
return errors.New("No connection found")
}
if s.subscriptions[channelID] == nil {
s.subscriptions[channelID] = make(map[*Client]bool)
}
s.subscriptions[channelID][c] = true
if s.subscriptionsList[c] == nil {
s.subscriptionsList[c] = []string{}
}
s.subscriptionsList[c] = append(s.subscriptionsList[c], channelID)
return nil
}
// UnsubscribeHandler returns function of type unsubscribe handler,
// it handles the unsubscription of pair in case of connection closing.
func (s *OHLCVSocket) UnsubscribeChannelHandler(channelID string) func(c *Client) {
return func(c *Client) {
s.UnsubscribeChannel(channelID, c)
}
}
func (s *OHLCVSocket) UnsubscribeHandler() func(c *Client) {
return func(c *Client) {
s.Unsubscribe(c)
}
}
// Unsubscribe is used to unsubscribe the connection from listening to the key
// subscribed to. It can be called on unsubscription message from user or due to some other reason by
// system
func (s *OHLCVSocket) UnsubscribeChannel(channelID string, c *Client) {
s.mu.Lock()
defer s.mu.Unlock()
if s.subscriptions[channelID][c] {
s.subscriptions[channelID][c] = false
delete(s.subscriptions[channelID], c)
}
}
func (s *OHLCVSocket) Unsubscribe(c *Client) {
s.mu.Lock()
defer s.mu.Unlock()
channelIDs := s.subscriptionsList[c]
if channelIDs == nil {
return
}
for _, id := range s.subscriptionsList[c] {
if s.subscriptions[id][c] {
s.subscriptions[id][c] = false
delete(s.subscriptions[id], c)
}
}
}
// BroadcastOHLCV Message streams message to all the subscribtions subscribed to the pair
func (s *OHLCVSocket) BroadcastOHLCV(channelID string, p interface{}) error {
s.mu.Lock()
defer s.mu.Unlock()
for c, status := range s.subscriptions[channelID] {
if status {
s.SendUpdateMessage(c, p)
}
}
return nil
}
// SendMessage sends a websocket message on the trade channel
func (s *OHLCVSocket) SendMessage(c *Client, msgType string, p interface{}) {
c.SendMessage(OHLCVChannel, msgType, p)
}
// SendErrorMessage sends an error message on the trade channel
func (s *OHLCVSocket) SendErrorMessage(c *Client, p interface{}) {
c.SendMessage(OHLCVChannel, "ERROR", p)
}
// SendInitMessage is responsible for sending message on trade ohlcv channel at subscription
func (s *OHLCVSocket) SendInitMessage(c *Client, p interface{}) {
c.SendMessage(OHLCVChannel, "INIT", p)
}
// SendUpdateMessage is responsible for sending message on trade ohlcv channel at subscription
func (s *OHLCVSocket) SendUpdateMessage(c *Client, p interface{}) {
c.SendMessage(OHLCVChannel, "UPDATE", p)
}