forked from benthosdev/benthos
/
dynamic.go
178 lines (157 loc) · 5.19 KB
/
dynamic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package output
import (
"encoding/json"
"fmt"
"path"
"sync"
"time"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/internal/interop"
"github.com/dafanshu/benthos/v3/lib/api"
"github.com/dafanshu/benthos/v3/lib/broker"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
"gopkg.in/yaml.v3"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeDynamic] = TypeSpec{
constructor: fromSimpleConstructor(NewDynamic),
Summary: `
A special broker type where the outputs are identified by unique labels and can
be created, changed and removed during runtime via a REST API.`,
Description: `
The broker pattern used is always ` + "`fan_out`" + `, meaning each message will
be delivered to each dynamic output.
To GET a JSON map of output identifiers with their current uptimes use the
'/outputs' endpoint.
To perform CRUD actions on the outputs themselves use POST, DELETE, and GET
methods on the ` + "`/outputs/{output_id}`" + ` endpoint. When using POST the
body of the request should be a YAML configuration for the output, if the output
already exists it will be changed.`,
FieldSpecs: docs.FieldSpecs{
// TODO: Update with component type.
docs.FieldCommon("outputs", "A map of outputs to statically create.").Map().HasType(docs.FieldTypeOutput),
docs.FieldCommon("prefix", "A path prefix for HTTP endpoints that are registered."),
docs.FieldCommon("timeout", "The server side timeout of HTTP requests."),
docs.FieldCommon(
"max_in_flight", "The maximum number of messages to dispatch across child outputs at any given time.",
),
},
Categories: []Category{
CategoryUtility,
},
}
}
//------------------------------------------------------------------------------
// DynamicConfig contains configuration fields for the Dynamic output type.
type DynamicConfig struct {
Outputs map[string]Config `json:"outputs" yaml:"outputs"`
Prefix string `json:"prefix" yaml:"prefix"`
Timeout string `json:"timeout" yaml:"timeout"`
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
}
// NewDynamicConfig creates a new DynamicConfig with default values.
func NewDynamicConfig() DynamicConfig {
return DynamicConfig{
Outputs: map[string]Config{},
Prefix: "",
Timeout: "5s",
MaxInFlight: 1,
}
}
//------------------------------------------------------------------------------
// NewDynamic creates a new Dynamic output type.
func NewDynamic(
conf Config,
mgr types.Manager,
log log.Modular,
stats metrics.Type,
) (Type, error) {
dynAPI := api.NewDynamic()
outputs := map[string]broker.DynamicOutput{}
for k, v := range conf.Dynamic.Outputs {
newOutput, err := New(v, mgr, log, stats)
if err != nil {
return nil, err
}
outputs[k] = newOutput
}
var reqTimeout time.Duration
if tout := conf.Dynamic.Timeout; len(tout) > 0 {
var err error
if reqTimeout, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse timeout string: %v", err)
}
}
outputConfigs := conf.Dynamic.Outputs
outputConfigsMut := sync.RWMutex{}
fanOut, err := broker.NewDynamicFanOut(
outputs, log, stats,
broker.OptDynamicFanOutSetOnAdd(func(l string) {
outputConfigsMut.Lock()
defer outputConfigsMut.Unlock()
uConf, exists := outputConfigs[l]
if !exists {
return
}
sConf, bErr := SanitiseConfig(uConf)
if bErr != nil {
log.Errorf("Failed to sanitise config: %v\n", bErr)
}
confBytes, _ := json.Marshal(sConf)
dynAPI.Started(l, confBytes)
delete(outputConfigs, l)
}),
broker.OptDynamicFanOutSetOnRemove(func(l string) {
dynAPI.Stopped(l)
}),
)
if err != nil {
return nil, err
}
fanOut = fanOut.WithMaxInFlight(conf.Dynamic.MaxInFlight)
dynAPI.OnUpdate(func(id string, c []byte) error {
newConf := NewConfig()
if err := yaml.Unmarshal(c, &newConf); err != nil {
return err
}
oMgr, oLog, oStats := interop.LabelChild(fmt.Sprintf("dynamic.outputs.%v", id), mgr, log, stats)
oStats = metrics.Combine(stats, oStats)
newOutput, err := New(newConf, oMgr, oLog, oStats)
if err != nil {
return err
}
outputConfigsMut.Lock()
outputConfigs[id] = newConf
outputConfigsMut.Unlock()
if err = fanOut.SetOutput(id, newOutput, reqTimeout); err != nil {
log.Errorf("Failed to set output '%v': %v", id, err)
outputConfigsMut.Lock()
delete(outputConfigs, id)
outputConfigsMut.Unlock()
}
return err
})
dynAPI.OnDelete(func(id string) error {
err := fanOut.SetOutput(id, nil, reqTimeout)
if err != nil {
log.Errorf("Failed to close output '%v': %v", id, err)
}
return err
})
mgr.RegisterEndpoint(
path.Join(conf.Dynamic.Prefix, "/outputs/{id}"),
"Perform CRUD operations on the configuration of dynamic outputs. For"+
" more information read the `dynamic` output type documentation.",
dynAPI.HandleCRUD,
)
mgr.RegisterEndpoint(
path.Join(conf.Dynamic.Prefix, "/outputs"),
"Get a map of running output identifiers with their current uptimes.",
dynAPI.HandleList,
)
return fanOut, nil
}
//------------------------------------------------------------------------------