forked from harness/harness
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hub.go
133 lines (113 loc) · 2.74 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package channel
import (
"sync"
)
// mutex to lock access to the
// internal map of hubs.
var mu sync.RWMutex
// a map of hubs. each hub represents a different
// channel that a set of users can listen on. For
// example, we may have a hub to stream build output
// for github.com/foo/bar or a channel to post
// updates for user octocat.
var hubs = map[string]*hub{}
type hub struct {
// Registered connections
connections map[*connection]bool
// Inbound messages from the connections.
broadcast chan string
// Register requests from the connections.
register chan *connection
// Unregister requests from connections.
unregister chan *connection
// Buffer of sent data. This is used mostly
// for build output. A client may connect after
// the build has already started, in which case
// we need to stream them the build history.
history []string
// Send a "shutdown" signal
close chan bool
// Hub responds on this channel letting you know
// if it's active
closed chan bool
// Auto shutdown when last connection removed
autoClose bool
// Send history
sendHistory bool
}
func newHub(sendHistory, autoClose bool) *hub {
h := hub{
broadcast: make(chan string),
register: make(chan *connection),
unregister: make(chan *connection),
connections: make(map[*connection]bool),
history: make([]string, 0), // This should be pre-allocated, but it's not
close: make(chan bool),
autoClose: autoClose,
closed: make(chan bool),
sendHistory: sendHistory,
}
return &h
}
func sendHistory(c *connection, history []string) {
if len(history) > 0 {
for i := range history {
c.send <- history[i]
}
}
}
func (h *hub) run() {
// make sure we don't bring down the application
// if somehow we encounter a nil pointer or some
// other unexpected behavior.
defer func() {
recover()
}()
for {
select {
case c := <-h.register:
h.connections[c] = true
if len(h.history) > 0 {
b := make([]string, len(h.history))
copy(b, h.history)
go sendHistory(c, b)
}
case c := <-h.unregister:
delete(h.connections, c)
close(c.send)
shutdown := h.autoClose && (len(h.connections) == 0)
if shutdown {
h.closed <- shutdown
return
}
h.closed <- shutdown
case m := <-h.broadcast:
if h.sendHistory {
h.history = append(h.history, m)
}
for c := range h.connections {
select {
case c.send <- m:
// do nothing
default:
delete(h.connections, c)
go c.ws.Close()
}
}
case <-h.close:
for c := range h.connections {
delete(h.connections, c)
close(c.send)
}
h.closed <- true
return
}
}
}
func (h *hub) Close() {
h.close <- true
}
func (h *hub) Write(p []byte) (n int, err error) {
h.broadcast <- string(p)
return len(p), nil
}