/
pusher.go
145 lines (129 loc) · 3.12 KB
/
pusher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package server
import (
"errors"
"math/rand"
"net/url"
"sort"
"github.com/golang/glog"
"github.com/gorilla/websocket"
)
// Slot keep the connection for a server
type Slot struct {
addr string
path string
connections []*Client
dialer *websocket.Dialer
}
func (s *Slot) Len() int {
return len(s.connections)
}
func (s *Slot) Less(i, j int) bool {
return !s.connections[i].dead
}
func (s *Slot) Swap(i, j int) {
s.connections[i], s.connections[j] = s.connections[j], s.connections[i]
}
func (s *Slot) newClient() (*Client, error) {
u := url.URL{Scheme: "ws", Host: s.addr, Path: s.path}
conn, _, err := s.dialer.Dial(u.String(), nil)
if err != nil {
return nil, err
}
client := &Client{hub: nil, conn: conn, send: make(chan []byte, 256), dead: false, retry: 0, ping: true}
go client.writePump()
go client.readPump()
s.connections = append(s.connections, client)
return client, nil
}
func (s *Slot) getClient() (*Client, error) {
// clean dead connections
sort.Sort(s)
count := 0
for _, v := range s.connections {
if !v.dead {
count++
} else {
break
}
}
s.connections = s.connections[:count]
// get a connection
if count == 0 {
return s.newClient()
}
return s.connections[rand.Int()%count], nil
}
// ClientPool keep the connections for pushing data
// TODO: perference support
type ClientPool struct {
slots []*Slot
dialer *websocket.Dialer
path string
}
func newClientPool(servers []string, path string, preconnect bool) *ClientPool {
p := &ClientPool{
dialer: &websocket.Dialer{HandshakeTimeout: connectTimeout},
path: path,
}
for _, server := range servers {
slot := &Slot{dialer: p.dialer, path: p.path, addr: server}
p.slots = append(p.slots, slot)
if preconnect {
_, err := slot.getClient()
if err != nil {
glog.Warningf("[pool] preconnect to %v%v failed: %v", slot.addr, slot.path, err)
}
}
}
return p
}
func (p *ClientPool) getClient() (*Client, error) {
length := len(p.slots)
if length == 0 {
return nil, errors.New("no server list")
}
pos := rand.Int() % len(p.slots)
for i := 0; i < length; i++ {
pos = (pos + i) % len(p.slots)
slot := p.slots[pos]
client, err := slot.getClient()
if err != nil {
glog.Warningf("[pool] get client of %v/%v failed: %v", slot.addr, slot.path, err)
continue
} else {
return client, nil
}
}
return nil, errors.New("[pool] all servers failed")
}
// Pusher always push the message to one server in the pool
// TODO: a instance of special Client may be a cute implementation
type Pusher struct {
pool *ClientPool
send chan []byte
}
func newPusher(servers []string, path string, preconnect bool) *Pusher {
p := &Pusher{
pool: newClientPool(servers, path, preconnect),
send: make(chan []byte, 256),
}
go p.Run()
return p
}
// Run start the push process
func (p *Pusher) Run() {
for {
m := <-p.send
client, err := p.pool.getClient()
if err != nil {
glog.Warningf("[pusher] push message failed: %v", err)
} else {
select {
case client.send <- m:
default:
close(client.send)
glog.Warningf("[pusher] push message to %v failed: queue full", client.conn.RemoteAddr())
}
}
}
}