forked from yoyoidea/sheep
/
depth.go
executable file
·145 lines (118 loc) · 3.28 KB
/
depth.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
depth.go
Connects to the Binance WebSocket and maintains
local depth cache.
*/
package main
import (
"sync"
"strings"
"log"
"fmt"
"encoding/json"
"github.com/gorilla/websocket"
"go-binance/binance"
)
const (
MaxDepth = 100 // Size of order book
MaxQueue = 100 // Size of message queue
)
// Message received from websocket
type State struct {
EventType string `json:"e"`
EventTime int64 `json:"E"`
Symbol string `json:"s"`
UpdateId int64 `json:"u"`
BidDelta []binance.Order `json:"b"`
AskDelta []binance.Order `json:"a"`
}
// Orderbook structure
type OrderBook struct {
Bids map[float64]float64 // Map of all bids, key->price, value->quantity
BidMutex sync.Mutex // Threadsafe
Asks map[float64]float64 // Map of all asks, key->price, value->quantity
AskMutex sync.Mutex // Threadsafe
Updates chan State // Channel of all state updates
}
// Process all incoming bids
func (o *OrderBook) ProcessBids(bids []binance.Order) {
for _, bid := range bids {
o.BidMutex.Lock()
if bid.Quantity == 0 {
delete(o.Bids, bid.Price)
} else {
o.Bids[bid.Price] = bid.Quantity
}
o.BidMutex.Unlock()
}
}
// Process all incoming asks
func (o *OrderBook) ProcessAsks(asks []binance.Order) {
for _, ask := range asks {
o.AskMutex.Lock()
if ask.Quantity == 0 {
delete(o.Asks, ask.Price)
} else {
o.Asks[ask.Price] = ask.Quantity
}
o.AskMutex.Unlock()
}
}
// Hands off incoming messages to processing functions
func (o *OrderBook) Maintainer() {
for {
select {
case job := <- o.Updates:
if len(job.BidDelta) > 0 {
go o.ProcessBids(job.BidDelta)
}
if len(job.AskDelta) > 0 {
go o.ProcessAsks(job.AskDelta)
}
}
}
}
func main() {
symbol := "ethbtc"
address := fmt.Sprintf("wss://stream.binance.com:9443/ws/%s@depth", symbol)
// Connect to websocket
var wsDialer websocket.Dialer
wsConn, _, err := wsDialer.Dial(address, nil)
if err != nil {
panic(err)
}
defer wsConn.Close()
log.Println("Dialed:", address)
// Set up Order Book
ob := OrderBook{}
ob.Bids = make(map[float64]float64, MaxDepth)
ob.Asks = make(map[float64]float64, MaxDepth)
ob.Updates = make(chan State, 500)
// Get initial state of orderbook from rest api
client := binance.New("", "")
query := binance.OrderBookQuery {
Symbol: strings.ToUpper(symbol),
}
orderBook, err := client.GetOrderBook(query)
if err != nil {
panic(err)
}
ob.ProcessBids(orderBook.Bids)
ob.ProcessAsks(orderBook.Asks)
// Start maintaining order book
go ob.Maintainer()
// Read & Process Messages from wss stream
for {
_, message, err := wsConn.ReadMessage()
if err != nil {
log.Println("[ERROR] ReadMessage:", err)
}
msg := State{}
err = json.Unmarshal(message, &msg)
if err != nil {
log.Println("[ERROR] Parsing:", err)
continue
}
ob.Updates <- msg
}
}