forked from harness/harness
-
Notifications
You must be signed in to change notification settings - Fork 0
/
channel.go
157 lines (128 loc) · 2.9 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package channel
import (
"crypto/rand"
"encoding/json"
"fmt"
"io"
"time"
"code.google.com/p/go.net/websocket"
"github.com/dchest/authcookie"
)
// secret key used to generate tokens
var secret = make([]byte, 32)
func init() {
// generate the secret key by reading
// from crypto/random
if _, err := io.ReadFull(rand.Reader, secret); err != nil {
panic(err)
}
}
// Create will generate a token and create a new
// channel over which messages will be sent.
func Create(name string) string {
mu.Lock()
defer mu.Unlock()
if _, ok := hubs[name]; !ok {
hub := newHub(false, true)
hubs[name] = hub
go hub.run()
}
return authcookie.NewSinceNow(name, 24*time.Hour, secret)
}
// CreateStream will generate a token and create a new
// channel over which messages streams (ie build output)
// are sent.
func CreateStream(name string) string {
mu.Lock()
defer mu.Unlock()
if _, ok := hubs[name]; !ok {
hub := newHub(true, false)
hubs[name] = hub
go hub.run()
}
return authcookie.NewSinceNow(name, 24*time.Hour, secret)
}
// Token will generate a token, but will not create
// a new channel.
func Token(name string) string {
return authcookie.NewSinceNow(name, 24*time.Hour, secret)
}
// Send sends a message on the named channel.
func Send(name string, message string) error {
return SendBytes(name, []byte(message))
}
// SendJSON sends a JSON-encoded value on
// the named channel.
func SendJSON(name string, value interface{}) error {
m, err := json.Marshal(value)
if err != nil {
return err
}
return SendBytes(name, m)
}
// SendBytes send a message in byte format on
// the named channel.
func SendBytes(name string, value []byte) error {
// get the hub for the specified channel name
mu.RLock()
hub, ok := hubs[name]
mu.RUnlock()
if !ok {
return fmt.Errorf("channel does not exist")
}
go hub.Write(value)
return nil
}
func Read(ws *websocket.Conn) {
// get the name from the request
hash := ws.Request().FormValue("token")
// get the hash of the token
name := authcookie.Login(hash, secret)
// get the hub for the specified channel name
mu.RLock()
hub, ok := hubs[name]
mu.RUnlock()
// if hub not found, exit
if !ok {
ws.Close()
return
}
// internal representation of a connection
// maximum queue of 100000 messages
conn := &connection{
send: make(chan string, 100000),
ws: ws,
}
// register the connection with the hub
hub.register <- conn
defer func() {
go func() {
hub.unregister <- conn
}()
closed := <-hub.closed
// this will remove the hub when the connection is
// closed if the
if hub.autoClose && closed {
mu.Lock()
delete(hubs, name)
mu.Unlock()
}
}()
go conn.writer()
conn.reader()
}
func Close(name string) {
// get the hub for the specified channel name
mu.RLock()
hub, ok := hubs[name]
mu.RUnlock()
if !ok {
return
}
// close hub connections
hub.Close()
// remove the hub
mu.Lock()
delete(hubs, name)
mu.Unlock()
}