forked from benthosdev/benthos
/
redis_pubsub.go
166 lines (135 loc) · 3.97 KB
/
redis_pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package reader
import (
"context"
"sync"
"time"
bredis "github.com/dafanshu/benthos/v3/internal/impl/redis"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/message"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
"github.com/go-redis/redis/v7"
)
//------------------------------------------------------------------------------
// RedisPubSubConfig contains configuration fields for the RedisPubSub input
// type.
type RedisPubSubConfig struct {
bredis.Config `json:",inline" yaml:",inline"`
Channels []string `json:"channels" yaml:"channels"`
UsePatterns bool `json:"use_patterns" yaml:"use_patterns"`
}
// NewRedisPubSubConfig creates a new RedisPubSubConfig with default values.
func NewRedisPubSubConfig() RedisPubSubConfig {
return RedisPubSubConfig{
Config: bredis.NewConfig(),
Channels: []string{"benthos_chan"},
UsePatterns: false,
}
}
//------------------------------------------------------------------------------
// RedisPubSub is an input type that reads Redis Pub/Sub messages.
type RedisPubSub struct {
client redis.UniversalClient
pubsub *redis.PubSub
cMut sync.Mutex
conf RedisPubSubConfig
stats metrics.Type
log log.Modular
}
// NewRedisPubSub creates a new RedisPubSub input type.
func NewRedisPubSub(
conf RedisPubSubConfig, log log.Modular, stats metrics.Type,
) (*RedisPubSub, error) {
r := &RedisPubSub{
conf: conf,
stats: stats,
log: log,
}
_, err := r.conf.Config.Client()
if err != nil {
return nil, err
}
return r, nil
}
//------------------------------------------------------------------------------
// Connect establishes a connection to a RedisPubSub server.
func (r *RedisPubSub) Connect() error {
return r.ConnectWithContext(context.Background())
}
// ConnectWithContext establishes a connection to an RedisPubSub server.
func (r *RedisPubSub) ConnectWithContext(ctx context.Context) error {
r.cMut.Lock()
defer r.cMut.Unlock()
if r.client != nil {
return nil
}
client, err := r.conf.Config.Client()
if err != nil {
return err
}
if _, err := client.Ping().Result(); err != nil {
return err
}
r.log.Infof("Receiving Redis pub/sub messages from channels: %v\n", r.conf.Channels)
r.client = client
if r.conf.UsePatterns {
r.pubsub = r.client.PSubscribe(r.conf.Channels...)
} else {
r.pubsub = r.client.Subscribe(r.conf.Channels...)
}
return nil
}
// Read attempts to pop a message from a redis pubsub channel.
func (r *RedisPubSub) Read() (types.Message, error) {
msg, _, err := r.ReadWithContext(context.Background())
return msg, err
}
// ReadWithContext attempts to pop a message from a redis pubsub channel.
func (r *RedisPubSub) ReadWithContext(ctx context.Context) (types.Message, AsyncAckFn, error) {
var pubsub *redis.PubSub
r.cMut.Lock()
pubsub = r.pubsub
r.cMut.Unlock()
if pubsub == nil {
return nil, nil, types.ErrNotConnected
}
select {
case rMsg, open := <-pubsub.Channel():
if !open {
r.disconnect()
return nil, nil, types.ErrTypeClosed
}
return message.New([][]byte{[]byte(rMsg.Payload)}), noopAsyncAckFn, nil
case <-ctx.Done():
}
return nil, nil, types.ErrTimeout
}
// Acknowledge is a noop since Redis pub/sub channels do not support
// acknowledgements.
func (r *RedisPubSub) Acknowledge(err error) error {
return nil
}
// disconnect safely closes a connection to an RedisPubSub server.
func (r *RedisPubSub) disconnect() error {
r.cMut.Lock()
defer r.cMut.Unlock()
var err error
if r.pubsub != nil {
err = r.pubsub.Close()
r.pubsub = nil
}
if r.client != nil {
err = r.client.Close()
r.client = nil
}
return err
}
// CloseAsync shuts down the RedisPubSub input and stops processing requests.
func (r *RedisPubSub) CloseAsync() {
r.disconnect()
}
// WaitForClose blocks until the RedisPubSub input has closed down.
func (r *RedisPubSub) WaitForClose(timeout time.Duration) error {
return nil
}
//------------------------------------------------------------------------------