forked from bpowers/radix
/
subscription.go
173 lines (142 loc) · 3.67 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package redis
import (
"log"
)
type subType uint8
const (
subSubscribe subType = iota
subUnsubscribe
subPsubscribe
subPunsubscribe
)
// Subscription is a structure for holding a Redis subscription for multiple channels.
type Subscription struct {
c *conn
msgHdlr func(msg *Message)
}
// newSubscription returns a new Subscription or an error.
func newSubscription(config *Config, msgHdlr func(msg *Message)) (*Subscription, *Error) {
var err *Error
s := &Subscription{
msgHdlr: msgHdlr,
}
// Connection handling
s.c, err = newConn(config)
if err != nil {
return nil, err
}
s.c.noReadTimeout = true // disable read timeout during pubsub mode
go s.listener()
return s, nil
}
// Subscribe subscribes to given channels or returns an error.
func (s *Subscription) Subscribe(channels ...string) (err *Error) {
return s.c.subscription(subSubscribe, channels)
}
// Unsubscribe unsubscribes from given channels or returns an error.
func (s *Subscription) Unsubscribe(channels ...string) (err *Error) {
return s.c.subscription(subUnsubscribe, channels)
}
// Psubscribe subscribes to given patterns or returns an error.
func (s *Subscription) Psubscribe(patterns ...string) (err *Error) {
return s.c.subscription(subPsubscribe, patterns)
}
// Punsubscribe unsubscribes from given patterns or returns an error.
func (s *Subscription) Punsubscribe(patterns ...string) (err *Error) {
return s.c.subscription(subPunsubscribe, patterns)
}
// Close closes the subscription.
func (s *Subscription) Close() {
// just sack the connection, listener will close down eventually.
s.c.close()
}
// readMessage reads and parses pubsub message data from the connection and returns it as a message.
func (s *Subscription) readMessage() *Message {
var err error
var rs string
m := new(Message)
r := s.c.read()
if r.Type != ReplyMulti || len(r.Elems) < 3 {
goto Err
}
// first argument is the message type
if rs, err = r.Elems[0].Str(); err != nil {
goto Err
}
switch rs {
case "subscribe":
m.Type = MessageSubscribe
case "unsubscribe":
m.Type = MessageUnsubscribe
case "psubscribe":
m.Type = MessagePsubscribe
case "punsubscribe":
m.Type = MessagePunsubscribe
case "message":
m.Type = MessageMessage
case "pmessage":
m.Type = MessagePmessage
default:
goto Err
}
// second argument
if rs, err = r.Elems[1].Str(); err != nil {
goto Err
}
switch {
case m.Type == MessageSubscribe || m.Type == MessageUnsubscribe:
m.Channel = rs
// number of subscriptions
if m.Subscriptions, err = r.Elems[2].Int(); err != nil {
goto Err
}
case m.Type == MessagePsubscribe || m.Type == MessagePunsubscribe:
m.Pattern = rs
// number of subscriptions
if m.Subscriptions, err = r.Elems[2].Int(); err != nil {
goto Err
}
case m.Type == MessageMessage:
m.Channel = rs
// payload
if m.Payload, err = r.Elems[2].Str(); err != nil {
goto Err
}
case m.Type == MessagePmessage:
m.Pattern = rs
// name of the originating channel
if m.Channel, err = r.Elems[2].Str(); err != nil {
goto Err
}
if len(r.Elems) < 4 {
goto Err
}
// payload
if m.Payload, err = r.Elems[3].Str(); err != nil {
goto Err
}
default:
goto Err
}
return m
Err:
// Error/Invalid message reply
// we shouldn't generally get these, except when closing.
if r.Err != nil && !r.Err.Test(ErrorConnection) {
log.Printf("received an unexpected error reply while in pubsub mode: %s.\n ignoring...",
r.Err)
}
return nil
}
// listener is a goroutine for reading and handling pubsub messages.
func (s *Subscription) listener() {
var m *Message
// read until connection is closed
for {
m = s.readMessage()
if m == nil && s.c.closed() {
return
}
go s.msgHdlr(m)
}
}