forked from benthosdev/benthos
/
package.go
161 lines (135 loc) · 4.19 KB
/
package.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Package query provides a parser for the right-hand side query part of the
// bloblang spec. This is useful as a separate package as it is used in
// isolation within interpolation functions.
package query
import (
"fmt"
"github.com/dafanshu/benthos/v4/internal/message"
)
type badFunctionErr string
func (e badFunctionErr) Error() string {
return fmt.Sprintf("unrecognised function '%v'", string(e))
}
type badMethodErr string
func (e badMethodErr) Error() string {
return fmt.Sprintf("unrecognised method '%v'", string(e))
}
//------------------------------------------------------------------------------
// MessageBatch is an interface type to be given to a query function, it allows
// the function to resolve fields and metadata from a Benthos message batch.
type MessageBatch interface {
Get(p int) *message.Part
Len() int
}
// MetaMsg provides access to the metadata of a message.
type MetaMsg interface {
MetaSetMut(key string, value any)
MetaGetStr(key string) string
MetaGetMut(key string) (any, bool)
MetaDelete(key string)
MetaIterMut(f func(k string, v any) error) error
MetaIterStr(f func(k, v string) error) error
}
// FunctionContext provides access to a range of query targets for functions to
// reference.
type FunctionContext struct {
Maps map[string]Function
Vars map[string]any
Index int
MsgBatch MessageBatch
// Reference new message being mapped
NewMeta MetaMsg
NewValue *any
valueFn func() *any
value *any
nextValue *any
namedValue *namedContextValue
// Used to track how many maps we've entered.
stackCount int
}
type namedContextValue struct {
name string
value any
next *namedContextValue
}
// IncrStackCount increases the count stored in the function context of how many
// maps we've entered and returns the current count.
func (ctx FunctionContext) IncrStackCount() (FunctionContext, int) { //nolint: gocritic // Ignore unnamedResult false positive
ctx.stackCount++
return ctx, ctx.stackCount
}
// NamedValue returns the value of a named context if it exists.
func (ctx FunctionContext) NamedValue(name string) (any, bool) {
current := ctx.namedValue
for current != nil {
if current.name == name {
return current.value, true
}
current = current.next
}
return nil, false
}
// WithNamedValue returns a FunctionContext with a named value.
func (ctx FunctionContext) WithNamedValue(name string, value any) FunctionContext {
previous := ctx.namedValue
ctx.namedValue = &namedContextValue{
name: name,
value: value,
next: previous,
}
return ctx
}
// Value returns a lazily evaluated context value. A context value is not always
// available and can therefore be nil.
func (ctx FunctionContext) Value() *any {
if ctx.value != nil {
return ctx.value
}
if ctx.valueFn == nil {
return nil
}
return ctx.valueFn()
}
// WithValueFunc returns a function context with a new value func.
func (ctx FunctionContext) WithValueFunc(fn func() *any) FunctionContext {
ctx.valueFn = fn
return ctx
}
// WithValue returns a function context with a new value.
func (ctx FunctionContext) WithValue(value any) FunctionContext {
ctx.nextValue = ctx.value
ctx.value = &value
return ctx
}
// PopValue returns the current default value, and a function context with the
// top value removed from the context stack. If the value returned is the
// absolute root value function then the context returned is unchanged. If there
// is no current default value then a nil value is returned and the context
// returned is unchanged.
func (ctx FunctionContext) PopValue() (*any, FunctionContext) {
retValue := ctx.Value()
if ctx.nextValue != nil {
ctx.value = ctx.nextValue
ctx.nextValue = nil
} else {
ctx.value = nil
}
return retValue, ctx
}
//------------------------------------------------------------------------------
// ExecToString returns a string from a function execution.
func ExecToString(fn Function, ctx FunctionContext) (string, error) {
v, err := fn.Exec(ctx)
if err != nil {
return "", err
}
return IToString(v), nil
}
// ExecToBytes returns a byte slice from a function execution.
func ExecToBytes(fn Function, ctx FunctionContext) ([]byte, error) {
v, err := fn.Exec(ctx)
if err != nil {
return nil, err
}
return IToBytes(v), nil
}