forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
redis_pubsub.go
52 lines (44 loc) · 1.83 KB
/
redis_pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package input
import (
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/internal/impl/redis"
"github.com/dafanshu/benthos/v3/lib/input/reader"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeRedisPubSub] = TypeSpec{
constructor: fromSimpleConstructor(NewRedisPubSub),
Summary: `
Consume from a Redis publish/subscribe channel using either the SUBSCRIBE or
PSUBSCRIBE commands.`,
Description: `
In order to subscribe to channels using the ` + "`PSUBSCRIBE`" + ` command set
the field ` + "`use_patterns` to `true`" + `, then you can include glob-style
patterns in your channel names. For example:
- ` + "`h?llo`" + ` subscribes to hello, hallo and hxllo
- ` + "`h*llo`" + ` subscribes to hllo and heeeello
- ` + "`h[ae]llo`" + ` subscribes to hello and hallo, but not hillo
Use ` + "`\\`" + ` to escape special characters if you want to match them
verbatim.`,
FieldSpecs: redis.ConfigDocs().Add(
docs.FieldCommon("channels", "A list of channels to consume from.").Array(),
docs.FieldCommon("use_patterns", "Whether to use the PSUBSCRIBE command."),
),
Categories: []Category{
CategoryServices,
},
}
}
//------------------------------------------------------------------------------
// NewRedisPubSub creates a new RedisPubSub input type.
func NewRedisPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
r, err := reader.NewRedisPubSub(conf.RedisPubSub, log, stats)
if err != nil {
return nil, err
}
return NewAsyncReader(TypeRedisPubSub, true, reader.NewAsyncPreserver(r), log, stats)
}
//------------------------------------------------------------------------------