forked from oxtoacart/flashlight
-
Notifications
You must be signed in to change notification settings - Fork 1
/
statserver.go
135 lines (120 loc) · 2.93 KB
/
statserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package statserver
import (
"encoding/json"
"net/http"
"sync"
"github.com/getlantern/eventsource"
"github.com/getlantern/golog"
)
var (
log = golog.LoggerFor("flashlight.statserver")
)
// Server provides an SSE server that publishes stat updates for peers.
// See (http://www.html5rocks.com/en/tutorials/eventsource/basics/) for more
// about Server-Sent Events.
type Server struct {
Addr string
clients map[int]*Client
clientsMutex sync.RWMutex
clientIdSeq int
peers map[string]*Peer
peersMutex sync.Mutex
}
// Client represents a client connected to the Server
type Client struct {
id int
conn *eventsource.Conn
server *Server
updates chan []byte
}
type Update struct {
Type string `json:"type"`
Data interface{} `json:"data"`
}
func (server *Server) ListenAndServe() error {
server.clients = make(map[int]*Client)
server.peers = make(map[string]*Peer)
httpServer := &http.Server{
Addr: server.Addr,
Handler: eventsource.Handler(server.onNewClient),
}
return httpServer.ListenAndServe()
}
func (server *Server) addClient(conn *eventsource.Conn) *Client {
server.clientsMutex.Lock()
defer server.clientsMutex.Unlock()
id := server.clientIdSeq
server.clientIdSeq = server.clientIdSeq + 1
client := &Client{
id: id,
conn: conn,
server: server,
updates: make(chan []byte, 1000),
}
server.clients[id] = client
return client
}
func (server *Server) removeClient(id int) {
server.clientsMutex.Lock()
defer server.clientsMutex.Unlock()
delete(server.clients, id)
}
func (server *Server) onNewClient(conn *eventsource.Conn) {
client := server.addClient(conn)
for {
select {
case update := <-client.updates:
client.conn.Write(update)
case <-client.conn.CloseNotify():
client.server.removeClient(client.id)
}
}
}
func (server *Server) OnBytesReceived(ip string, bytes int64) {
peer, err := server.getOrCreatePeer(ip)
if err != nil {
log.Errorf("Unable to getOrCreatePeer: %s", err)
return
}
peer.onBytesReceived(bytes)
}
func (server *Server) OnBytesSent(ip string, bytes int64) {
peer, err := server.getOrCreatePeer(ip)
if err != nil {
log.Errorf("Unable to getOrCreatePeer: %s", err)
return
}
peer.onBytesSent(bytes)
}
func (server *Server) getOrCreatePeer(ip string) (*Peer, error) {
server.peersMutex.Lock()
defer server.peersMutex.Unlock()
peer, found := server.peers[ip]
if found {
return peer, nil
}
peer, err := newPeer(ip, server.onPeerUpdate)
if err != nil {
return nil, err
}
server.peers[ip] = peer
return peer, nil
}
func (server *Server) onPeerUpdate(peer *Peer) {
update, err := json.Marshal(&Update{
Type: "peer",
Data: peer,
})
if err != nil {
log.Errorf("Unable to marshal peer update: %s", err)
return
}
server.pushUpdate(update)
}
func (server *Server) pushUpdate(update []byte) {
server.clientsMutex.Lock()
defer server.clientsMutex.Unlock()
for _, client := range server.clients {
client.updates <- update
}
}