-
Notifications
You must be signed in to change notification settings - Fork 783
/
resolver.go
97 lines (82 loc) · 2.54 KB
/
resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package field
import (
"strconv"
"github.com/Jeffail/benthos/v3/internal/bloblang/query"
"github.com/Jeffail/benthos/v3/lib/message"
)
//------------------------------------------------------------------------------
// Resolver is an interface for resolving a string containing Bloblang function
// interpolations into either a string or bytes.
type Resolver interface {
ResolveString(index int, msg Message, escaped, legacy bool) string
ResolveBytes(index int, msg Message, escaped, legacy bool) []byte
}
//------------------------------------------------------------------------------
// StaticResolver is a Resolver implementation that simply returns a static
// string
type StaticResolver string
// ResolveString returns a string.
func (s StaticResolver) ResolveString(index int, msg Message, escaped, legacy bool) string {
return string(s)
}
// ResolveBytes returns a byte slice.
func (s StaticResolver) ResolveBytes(index int, msg Message, escaped, legacy bool) []byte {
return []byte(s)
}
//------------------------------------------------------------------------------
// QueryResolver executes a query and returns a string representation of the
// result.
type QueryResolver struct {
fn query.Function
}
// NewQueryResolver creates a field query resolver that returns the result of a
// query function.
func NewQueryResolver(fn query.Function) *QueryResolver {
return &QueryResolver{fn}
}
// ResolveString returns a string.
func (q QueryResolver) ResolveString(index int, msg Message, escaped, legacy bool) string {
if msg == nil {
msg = message.New(nil)
}
return query.ExecToString(q.fn, query.FunctionContext{
Index: index,
MsgBatch: msg,
Legacy: legacy,
NewMsg: msg.Get(index),
}.WithValueFunc(func() *interface{} {
if jObj, err := msg.Get(index).JSON(); err == nil {
return &jObj
}
return nil
}))
}
// ResolveBytes returns a byte slice.
func (q QueryResolver) ResolveBytes(index int, msg Message, escaped, legacy bool) []byte {
if msg == nil {
msg = message.New(nil)
}
bs := query.ExecToBytes(q.fn, query.FunctionContext{
Index: index,
MsgBatch: msg,
Legacy: legacy,
NewMsg: msg.Get(index),
}.WithValueFunc(func() *interface{} {
if jObj, err := msg.Get(index).JSON(); err == nil {
return &jObj
}
return nil
}))
if escaped {
bs = escapeBytes(bs)
}
return bs
}
func escapeBytes(in []byte) []byte {
quoted := strconv.Quote(string(in))
if len(quoted) < 3 {
return in
}
return []byte(quoted[1 : len(quoted)-1])
}
//------------------------------------------------------------------------------