forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
resource.go
128 lines (109 loc) · 3.35 KB
/
resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package input
import (
"context"
"time"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/internal/interop"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeResource] = TypeSpec{
constructor: fromSimpleConstructor(NewResource),
Summary: `
Resource is an input type that runs a resource input by its name.`,
Description: `
This input allows you to reference the same configured input resource in multiple places, and can also tidy up large nested configs. For
example, the config:
` + "```yaml" + `
input:
broker:
inputs:
- kafka:
addresses: [ TODO ]
topics: [ foo ]
consumer_group: foogroup
- gcp_pubsub:
project: bar
subscription: baz
` + "```" + `
Could also be expressed as:
` + "```yaml" + `
input:
broker:
inputs:
- resource: foo
- resource: bar
input_resources:
- label: foo
kafka:
addresses: [ TODO ]
topics: [ foo ]
consumer_group: foogroup
- label: bar
gcp_pubsub:
project: bar
subscription: baz
` + "```" + `
You can find out more about resources [in this document.](/docs/configuration/resources)`,
Categories: []Category{
CategoryUtility,
},
config: docs.FieldComponent().HasType(docs.FieldTypeString).HasDefault(""),
}
}
//------------------------------------------------------------------------------
// Resource is an input that wraps an input resource.
type Resource struct {
mgr types.Manager
name string
log log.Modular
mErrNotFound metrics.StatCounter
}
// NewResource returns a resource input.
func NewResource(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
if err := interop.ProbeInput(context.Background(), mgr, conf.Resource); err != nil {
return nil, err
}
return &Resource{
mgr: mgr,
name: conf.Resource,
log: log,
mErrNotFound: stats.GetCounter("error_not_found"),
}, nil
}
//------------------------------------------------------------------------------
// TransactionChan returns a transactions channel for consuming messages from
// this input type.
func (r *Resource) TransactionChan() (tChan <-chan types.Transaction) {
if err := interop.AccessInput(context.Background(), r.mgr, r.name, func(i types.Input) {
tChan = i.TransactionChan()
}); err != nil {
r.log.Debugf("Failed to obtain input resource '%v': %v", r.name, err)
r.mErrNotFound.Incr(1)
}
return
}
// Connected returns a boolean indicating whether this input is currently
// connected to its target.
func (r *Resource) Connected() (isConnected bool) {
if err := interop.AccessInput(context.Background(), r.mgr, r.name, func(i types.Input) {
isConnected = i.Connected()
}); err != nil {
r.log.Debugf("Failed to obtain input resource '%v': %v", r.name, err)
r.mErrNotFound.Incr(1)
}
return
}
// CloseAsync shuts down the processor and stops processing requests.
func (r *Resource) CloseAsync() {
}
// WaitForClose blocks until the processor has closed down.
func (r *Resource) WaitForClose(timeout time.Duration) error {
return nil
}
//------------------------------------------------------------------------------