forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
switch_deprecated.go
133 lines (111 loc) · 3.15 KB
/
switch_deprecated.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package processor
import (
"fmt"
"strconv"
"time"
"github.com/dafanshu/benthos/v3/internal/interop"
"github.com/dafanshu/benthos/v3/internal/tracing"
"github.com/dafanshu/benthos/v3/lib/condition"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
)
type switchCaseDeprecated struct {
condition types.Condition
processors []types.Processor
fallThrough bool
}
type switchDeprecated struct {
cases []switchCaseDeprecated
log log.Modular
mCount metrics.StatCounter
mSent metrics.StatCounter
mBatchSent metrics.StatCounter
}
func newSwitchDeprecated(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
var cases []switchCaseDeprecated
for i, caseConf := range conf.Switch {
prefix := strconv.Itoa(i)
var err error
var cond types.Condition
var procs []types.Processor
cMgr, cLog, cStats := interop.LabelChild(prefix+".condition", mgr, log, stats)
if cond, err = condition.New(caseConf.Condition, cMgr, cLog, cStats); err != nil {
return nil, fmt.Errorf("case [%v] condition: %w", i, err)
}
for j, procConf := range caseConf.Processors {
pMgr, pLog, pStats := interop.LabelChild(prefix+"."+strconv.Itoa(j), mgr, log, stats)
var proc types.Processor
if proc, err = New(procConf, pMgr, pLog, pStats); err != nil {
return nil, fmt.Errorf("case [%v] processor [%v]: %w", i, j, err)
}
procs = append(procs, proc)
}
cases = append(cases, switchCaseDeprecated{
condition: cond,
processors: procs,
fallThrough: caseConf.Fallthrough,
})
}
return &switchDeprecated{
cases: cases,
log: log,
mCount: stats.GetCounter("count"),
mSent: stats.GetCounter("sent"),
mBatchSent: stats.GetCounter("batch.sent"),
}, nil
}
//------------------------------------------------------------------------------
func (s *switchDeprecated) ProcessMessage(msg types.Message) (msgs []types.Message, res types.Response) {
s.mCount.Incr(1)
var procs []types.Processor
fellthrough := false
spans := tracing.CreateChildSpans(TypeSwitch, msg)
for i, switchCase := range s.cases {
if !fellthrough && !switchCase.condition.Check(msg) {
continue
}
procs = append(procs, switchCase.processors...)
for _, s := range spans {
s.LogKV(
"event", "case_match",
"value", strconv.Itoa(i),
)
}
if fellthrough = switchCase.fallThrough; !fellthrough {
break
}
}
for _, s := range spans {
s.Finish()
}
msgs, res = ExecuteAll(procs, msg)
s.mBatchSent.Incr(int64(len(msgs)))
totalParts := 0
for _, msg := range msgs {
totalParts += msg.Len()
}
s.mSent.Incr(int64(totalParts))
return
}
func (s *switchDeprecated) CloseAsync() {
for _, s := range s.cases {
for _, proc := range s.processors {
proc.CloseAsync()
}
}
}
func (s *switchDeprecated) WaitForClose(timeout time.Duration) error {
stopBy := time.Now().Add(timeout)
for _, s := range s.cases {
for _, proc := range s.processors {
if err := proc.WaitForClose(time.Until(stopBy)); err != nil {
return err
}
}
}
return nil
}
//------------------------------------------------------------------------------