/
package.go
177 lines (148 loc) · 4.54 KB
/
package.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Package query provides a parser for the right-hand side query part of the
// bloblang spec. This is useful as a separate package as it is used in
// isolation within interpolation functions.
package query
import (
"fmt"
"github.com/Jeffail/benthos/v3/lib/types"
)
// ErrRecoverable represents a function execution error that can optionally be
// recovered into a zero-value.
type ErrRecoverable struct {
Recovered interface{}
Err error
}
// Unwrap the error.
func (e *ErrRecoverable) Unwrap() error {
return e.Err
}
// Error implements the standard error interface.
func (e *ErrRecoverable) Error() string {
return e.Err.Error()
}
//------------------------------------------------------------------------------
type badFunctionErr string
func (e badFunctionErr) Error() string {
return fmt.Sprintf("unrecognised function '%v'", string(e))
}
type badMethodErr string
func (e badMethodErr) Error() string {
return fmt.Sprintf("unrecognised method '%v'", string(e))
}
//------------------------------------------------------------------------------
// MessageBatch is an interface type to be given to a query function, it allows
// the function to resolve fields and metadata from a Benthos message batch.
type MessageBatch interface {
Get(p int) types.Part
Len() int
}
// FunctionContext provides access to a range of query targets for functions to
// reference.
type FunctionContext struct {
Maps map[string]Function
Vars map[string]interface{}
Index int
MsgBatch MessageBatch
Legacy bool
// Reference new message being mapped
NewMsg types.Part
valueFn func() *interface{}
value *interface{}
nextValue *interface{}
namedValue *namedContextValue
// Used to track how many maps we've entered.
stackCount int
}
type namedContextValue struct {
name string
value interface{}
next *namedContextValue
}
// IncrStackCount increases the count stored in the function context of how many
// maps we've entered and returns the current count.
// nolint:gocritic // Ignore unnamedResult false positive
func (ctx FunctionContext) IncrStackCount() (FunctionContext, int) {
ctx.stackCount++
return ctx, ctx.stackCount
}
// NamedValue returns the value of a named context if it exists.
func (ctx FunctionContext) NamedValue(name string) (interface{}, bool) {
current := ctx.namedValue
for current != nil {
if current.name == name {
return current.value, true
}
current = current.next
}
return nil, false
}
// WithNamedValue returns a FunctionContext with a named value.
func (ctx FunctionContext) WithNamedValue(name string, value interface{}) FunctionContext {
previous := ctx.namedValue
ctx.namedValue = &namedContextValue{
name: name,
value: value,
next: previous,
}
return ctx
}
// Value returns a lazily evaluated context value. A context value is not always
// available and can therefore be nil.
func (ctx FunctionContext) Value() *interface{} {
if ctx.value != nil {
return ctx.value
}
if ctx.valueFn == nil {
return nil
}
return ctx.valueFn()
}
// WithValueFunc returns a function context with a new value func.
func (ctx FunctionContext) WithValueFunc(fn func() *interface{}) FunctionContext {
ctx.valueFn = fn
return ctx
}
// WithValue returns a function context with a new value.
func (ctx FunctionContext) WithValue(value interface{}) FunctionContext {
ctx.nextValue = ctx.value
ctx.value = &value
return ctx
}
// PopValue returns the current default value, and a function context with the
// top value removed from the context stack. If the value returned is the
// absolute root value function then the context returned is unchanged. If there
// is no current default value then a nil value is returned and the context
// returned is unchanged.
func (ctx FunctionContext) PopValue() (*interface{}, FunctionContext) {
retValue := ctx.Value()
if ctx.nextValue != nil {
ctx.value = ctx.nextValue
ctx.nextValue = nil
} else {
ctx.value = nil
}
return retValue, ctx
}
//------------------------------------------------------------------------------
// ExecToString returns a string from a function exection.
func ExecToString(fn Function, ctx FunctionContext) string {
v, err := fn.Exec(ctx)
if err != nil {
if rec, ok := err.(*ErrRecoverable); ok {
return IToString(rec.Recovered)
}
return ""
}
return IToString(v)
}
// ExecToBytes returns a byte slice from a function exection.
func ExecToBytes(fn Function, ctx FunctionContext) []byte {
v, err := fn.Exec(ctx)
if err != nil {
if rec, ok := err.(*ErrRecoverable); ok {
return IToBytes(rec.Recovered)
}
return nil
}
return IToBytes(v)
}