/
channel.go
125 lines (100 loc) · 2.46 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"context"
"sync"
log "github.com/sirupsen/logrus"
"github.com/go-redis/redis"
)
var (
channels = make(map[string]*Channel)
)
// A ChannelSubscriber receives messages from a Channel
type ChannelSubscriber interface {
Receive(message []byte)
}
// A Channel handles Redis connections and the passing of messages
// to ChannelSubscribers
type Channel struct {
subscribed *sync.Mutex
cancel context.CancelFunc
Name string
Subscribers map[*Subscription]bool
}
// GetChannel gets or creates a channel in the global space
func GetChannel(name string) (*Channel, error) {
// if channel already exists, return it
if channel, ok := channels[name]; ok {
return channel, nil
}
// otherwise, make one
channel := &Channel{
Name: name,
Subscribers: make(map[*Subscription]bool),
subscribed: &sync.Mutex{},
}
ctx, cancel := context.WithCancel(context.Background())
channel.cancel = cancel
err := channel.register(ctx, daddy)
if err != nil {
return nil, err
}
channels[name] = channel
return channel, nil
}
func (c *Channel) register(ctx context.Context, client *redis.Client) error {
c.subscribed.Lock()
pubsub := client.Subscribe(c.Name)
// test receive
_, err := pubsub.Receive()
if err != nil {
c.subscribed.Unlock()
pubsub.Close()
return err
}
go func() {
defer c.subscribed.Unlock()
defer pubsub.Close()
for {
select {
case <-ctx.Done():
// stop
return
case msg := <-pubsub.Channel():
c.Dispatch(msg.Payload)
}
}
}()
return nil
}
// Close the channel and release redis resources
func (c Channel) Close() {
c.cancel()
// wait for subscriber to die
c.subscribed.Lock()
c.subscribed.Unlock()
}
// Subscribe to this channel
func (c *Channel) Subscribe(subscriber *Subscription) {
if c.Subscribers[subscriber] {
// already subscribed
log.Debugf("[channel %v] attempted to subscribe but already subscribed: %p", c.Name, subscriber)
return
}
c.Subscribers[subscriber] = true
}
// Unsubscribe from this channel
func (c *Channel) Unsubscribe(subscriber *Subscription) {
if !c.Subscribers[subscriber] {
// not subscribed
log.Debugf("[channel %v] attempted to unsubscribe but not subscribed: %p", c.Name, subscriber)
return
}
delete(c.Subscribers, subscriber)
}
// Dispatch a message to all registered subscribers
func (c Channel) Dispatch(message string) {
byteMessage := []byte(message)
for subscription := range c.Subscribers {
subscription.Receive(byteMessage)
}
}