-
Notifications
You must be signed in to change notification settings - Fork 2
/
pubsub.go
63 lines (52 loc) · 1.55 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package redis
import (
"github.com/go-redis/redis"
"github.com/golang/protobuf/proto"
"libs.altipla.consulting/errors"
)
// PubSub represents a connection to a redis PubSub. It can be used to publish
// and receive protobuf messages.
type PubSub struct {
db *Database
name string
}
// Subscribe opens a new connection to the server and starts downloading messages.
func (pubsub *PubSub) Subscribe() *PubSubSubscription {
ps := pubsub.db.sess.Subscribe(pubsub.name)
return &PubSubSubscription{
ps: ps,
ch: ps.Channel(),
}
}
// Publish sends a new message to the server. Only subscription connected at
// the same time will receive the message.
func (pubsub *PubSub) Publish(msg proto.Message) error {
serialized, err := proto.Marshal(msg)
if err != nil {
return errors.Wrapf(err, "cannot serialize pubsub message")
}
if err := pubsub.db.sess.Publish(pubsub.name, string(serialized)).Err(); err != nil {
return errors.Wrapf(err, "cannot publish pubsub message")
}
return nil
}
// PubSubSubscription stores the state of an active connection to the server.
type PubSubSubscription struct {
ps *redis.PubSub
ch <-chan *redis.Message
}
// Close exits the connection.
func (sub *PubSubSubscription) Close() {
sub.ps.Close()
}
// Next waits for the next message and decodes it in the destination.
func (sub *PubSubSubscription) Next(dest proto.Message) error {
msg := <-sub.ch
if msg == nil {
return ErrDone
}
if err := proto.Unmarshal([]byte(msg.Payload), dest); err != nil {
return errors.Wrapf(err, "cannot parse pubsub message")
}
return nil
}