/
hub.go
117 lines (102 loc) · 2.27 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main
import (
"fmt"
"log"
"time"
r "gopkg.in/dancannon/gorethink.v2"
)
type hub struct {
queue queue
channels channels
ticker *mTicker
session *r.Session
}
type record struct {
Id, Text string
}
type channels map[string]*channel
func newHub() *hub {
return &hub{
queue: make(queue, 16),
channels: make(channels),
ticker: newMTicker(pingPeriod),
}
}
func newChannel(h *hub, path string) *channel {
return &channel{
queue: make(queue, 16),
connections: make(connections),
h: h,
path: path,
}
}
func (h *hub) run() {
defer h.ticker.stop()
// Open a connection to rethinkdb
var err error
h.session, err = r.Connect(r.ConnectOpts{
Address: "localhost:28015",
Database: "pinghub",
})
if err != nil {
log.Fatalln(err.Error())
}
defer h.session.Close()
// Subscribe to the changefeed
cursor, err := r.Table("pinghub").Changes().Field("new_val").Run(h.session)
if err != nil {
log.Fatalln(err)
}
defer cursor.Close()
// Pipe changefeed messages into channel queues
go func() {
var rec record
for cursor.Next(&rec) {
if rec.Id != "" && rec.Text != "" {
if channel, ok := h.channels[rec.Id]; ok {
channel.queue <- command{cmd: BROADCAST, text: []byte(rec.Text)}
}
}
}
}()
for cmd := range h.queue {
// Forward cmds to their path's channel queues.
switch cmd.cmd {
case SUBSCRIBE:
h.subscribe(cmd)
case PUBLISH:
h.publish(cmd)
case REMOVE:
h.remove(cmd)
default:
panic(fmt.Sprintf("unexpected hub cmd: %v\n", cmd))
}
}
}
func (h *hub) subscribe(cmd command) {
// Create a channel if needed.
if _, ok := h.channels[cmd.path]; !ok {
h.channels[cmd.path] = newChannel(h, cmd.path)
go h.channels[cmd.path].run()
}
// Give the connection a reference to its own channel.
cmd.conn.control <- h.channels[cmd.path]
h.channels[cmd.path].queue <- cmd
}
func (h *hub) publish(cmd command) {
_, err := r.Table("pinghub").Insert(map[string]interface{}{
"id": string(cmd.path),
"text": string(cmd.text),
"time": time.Now().UnixNano(),
}, r.InsertOpts{
Conflict: "replace",
}).RunWrite(h.session)
if err != nil {
log.Println("Failed to publish post")
}
}
func (h *hub) remove(cmd command) {
if _, ok := h.channels[cmd.path]; ok {
delete(h.channels, cmd.path)
}
}