/
websocket.go
201 lines (179 loc) · 5.34 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// Copyright (c) 2019, The Decred-Next developers
// See LICENSE for details.
package exchanges
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/carterjones/signalr"
"github.com/carterjones/signalr/hubs"
"github.com/gorilla/websocket"
)
const (
wsWriteTimeout = 5 * time.Second
fauxBrowserUA = "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"
)
// An interface wraps the websocket to enable testing.
type websocketFeed interface {
// Done should return a channel that will be closed when there is a
// disconnection.
Done() chan struct{}
// Read will block until a message is received or an error occurs. Use
// Close from another thread to force a disconnection error.
Read() ([]byte, error)
// Write sends a message. Write will safely sequence messages from multiple
// threads.
Write(interface{}) error
// Close will disconnect, causing any pending Read operations to error out.
Close()
}
// The socketConfig is the configuration passed to newSocketConnection. It is
// just an address for now, but enables more customized settings as exchanges'
// websocket protocols are eventually implemented.
type socketConfig struct {
address string
}
// A manager for a gorilla websocket connection.
// Satisfies websocketFeed interface.
type socketClient struct {
mtx sync.Mutex
on bool
conn *websocket.Conn
done chan struct{}
}
// Read is a wrapper for gorilla's ReadMessage that satisfies websocketFeed.Read.
func (client *socketClient) Read() (msg []byte, err error) {
_, msg, err = client.conn.ReadMessage()
return
}
// Write is a wrapper for gorilla WriteMessage. Satisfies websocketFeed.Write.
// JSON marshaling is performed before sending. Writes are sequenced with a
// mutex lock for per-connection multi-threaded use.
func (client *socketClient) Write(msg interface{}) error {
client.mtx.Lock()
defer client.mtx.Unlock()
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
return client.conn.WriteMessage(websocket.TextMessage, bytes)
}
// Done returns a channel that will be closed when the websocket connection is
// closed. Satisfies websocketFeed.Done.
func (client *socketClient) Done() chan struct{} {
return client.done
}
// Close is wrapper for gorilla's Close that satisfies websocketFeed.Close.
func (client *socketClient) Close() {
client.mtx.Lock()
defer client.mtx.Unlock()
if !client.on {
return
}
client.on = false
close(client.done)
client.conn.Close()
}
// Constructor for a socketClient, but returned as a websocketFeed.
func newSocketConnection(cfg *socketConfig) (websocketFeed, error) {
dialer := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment, // Same as DefaultDialer.
HandshakeTimeout: 10 * time.Second, // DefaultDialer is 45 seconds.
}
conn, _, err := dialer.Dial(cfg.address, nil)
if err != nil {
return nil, err
}
return &socketClient{
conn: conn,
done: make(chan struct{}),
on: true,
}, nil
}
// Dump the signalr.Message to something readable.
func dumpSignalrMsg(msg signalr.Message) {
fmt.Println("=================================")
fmt.Printf("C: %s\n", jsonify(msg.C))
fmt.Printf("S: %s\n", jsonify(msg.S))
fmt.Printf("G: %s\n", jsonify(msg.G))
fmt.Printf("I: %s\n", jsonify(msg.I))
fmt.Printf("E: %s\n", jsonify(msg.E))
s, _ := msg.R.MarshalJSON()
fmt.Printf("R: %s\n", string(s))
s, _ = msg.H.MarshalJSON()
fmt.Printf("H: %s\n", string(s))
s, _ = msg.D.MarshalJSON()
fmt.Printf("D: %s\n", string(s))
s, _ = msg.T.MarshalJSON()
fmt.Printf("T: %s\n", string(s))
for _, hubMsg := range msg.M {
fmt.Printf(" M: %s\n", hubMsg.M)
for _, arg := range hubMsg.A {
fmt.Printf(" A: %s\n", jsonify(arg))
}
}
fmt.Println("=================================")
}
// The interface for a signalr connection.
type signalrClient interface {
Send(hubs.ClientMsg) error
Close()
}
type signalrConfig struct {
host string
protocol string
endpoint string
connectionData string
params map[string]string
msgHandler signalr.MsgHandler // func(msg signalr.Message)
errHandler signalr.ErrHandler // func(err error)
}
// A wrapper for the signalr.Client. Satisfies signalrClient.
type signalrConnection struct {
c *signalr.Client
mtx sync.Mutex
on bool
}
// Send sends the ClientMsg on the connection. A mutex makes Send thread-safe.
func (conn *signalrConnection) Send(msg hubs.ClientMsg) error {
conn.mtx.Lock()
defer conn.mtx.Unlock()
return conn.c.Send(msg)
}
// Close closes the underlying signalr connection.
func (conn *signalrConnection) Close() {
// Underlying connection Close can block, so measures should be taken prevent
// calls to Close on an already closed connection.
conn.mtx.Lock()
defer conn.mtx.Unlock()
if !conn.on {
return
}
conn.on = false
conn.c.Close()
}
// Create a new signalr connection. Returns the signalrClient interface rather
// than the signalrConnection.
func newSignalrConnection(cfg *signalrConfig) (signalrClient, error) {
// Prepare a SignalR client.
c := signalr.New(
cfg.host,
cfg.protocol,
cfg.endpoint,
cfg.connectionData,
cfg.params,
)
// Set the user agent to one that looks like a browser.
c.Headers["User-Agent"] = fauxBrowserUA
// Start the connection.
err := c.Run(cfg.msgHandler, cfg.errHandler)
if err != nil {
return nil, err
}
return &signalrConnection{
c: c,
on: true,
}, nil
}