forked from revel/revel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chatroom.go
114 lines (96 loc) · 2.46 KB
/
chatroom.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package chatroom
import (
"container/list"
"time"
)
type Event struct {
Type string // "join", "leave", or "message"
User string
Timestamp int // Unix timestmap (secs)
Text string // What the user said (if Type == "say")
}
type Subscription struct {
Archive []Event // All the events from the archive.
New <-chan Event // New events coming in.
}
// Owner of a subscription must cancel it when they stop listening to events.
func (s Subscription) Cancel() {
unsubscribe <- s.New // Unsubscribe the channel.
drain(s.New) // Drain it, just in case there was a pending publish.
}
func newEvent(typ, user, msg string) Event {
return Event{typ, user, int(time.Now().Unix()), msg}
}
func Subscribe() Subscription {
resp := make(chan Subscription)
subscribe <- resp
return <-resp
}
func Join(user string) {
publish <- newEvent("join", user, "")
}
func Say(user, message string) {
publish <- newEvent("message", user, message)
}
func Leave(user string) {
publish <- newEvent("leave", user, "")
}
const archiveSize = 10
var (
// Send a channel here to get room events back. It will send the entire
// archive initially, and then new messages as they come in.
subscribe = make(chan (chan<- Subscription))
// Send a channel here to unsubscribe.
unsubscribe = make(chan (<-chan Event))
// Send events here to publish them.
publish = make(chan Event)
)
// This function loops forever, handling the chat room pubsub
func chatroom() {
// subscribers := map[string]chan<-Event // map user to channel
archive := list.New()
subscribers := list.New()
for {
select {
case ch := <-subscribe:
var events []Event
for e := archive.Front(); e != nil; e = e.Next() {
events = append(events, e.Value.(Event))
}
subscriber := make(chan Event)
subscribers.PushBack(subscriber)
ch <- Subscription{events, subscriber}
case event := <-publish:
for ch := subscribers.Front(); ch != nil; ch = ch.Next() {
ch.Value.(chan Event) <- event
}
if archive.Len() >= archiveSize {
archive.Remove(archive.Front())
}
archive.PushBack(event)
case unsub := <-unsubscribe:
for ch := subscribers.Front(); ch != nil; ch = ch.Next() {
if ch.Value.(chan Event) == unsub {
subscribers.Remove(ch)
}
}
}
}
}
func init() {
go chatroom()
}
// Helpers
// Drains a given channel of any messages.
func drain(ch <-chan Event) {
for {
select {
case _, ok := <-ch:
if !ok {
return
}
default:
return
}
}
}