/
processor.go
109 lines (92 loc) · 2.7 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package msgpack
import (
"context"
"fmt"
"github.com/Jeffail/benthos/v3/public/service"
"github.com/vmihailenco/msgpack/v5"
)
func processorConfig() *service.ConfigSpec {
return service.NewConfigSpec().
// Stable(). TODO
Categories("Parsing").
Summary("Converts messages to or from the [MessagePack](https://msgpack.org/) format.").
Field(service.NewStringAnnotatedEnumField("operator", map[string]string{
"to_json": "Convert MessagePack messages to JSON format",
"from_json": "Convert JSON messages to MessagePack format",
}).Description("The operation to perform on messages.")).
Version("3.59.0")
}
func init() {
err := service.RegisterProcessor(
"msgpack", processorConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
return newProcessorFromConfig(conf)
})
if err != nil {
panic(err)
}
}
type msgPackOperator func(m *service.Message) (*service.Message, error)
func strToMsgPackOperator(opStr string) (msgPackOperator, error) {
switch opStr {
case "to_json":
return func(m *service.Message) (*service.Message, error) {
mBytes, err := m.AsBytes()
if err != nil {
return nil, err
}
var jObj interface{}
if err := msgpack.Unmarshal(mBytes, &jObj); err != nil {
return nil, fmt.Errorf("failed to convert MsgPack document to JSON: %v", err)
}
resMsg := m.Copy()
resMsg.SetStructured(jObj)
return resMsg, nil
}, nil
case "from_json":
return func(m *service.Message) (*service.Message, error) {
jObj, err := m.AsStructured()
if err != nil {
return nil, fmt.Errorf("failed to parse message as JSON: %v", err)
}
b, err := msgpack.Marshal(jObj)
if err != nil {
return nil, fmt.Errorf("failed to convert JSON to MsgPack: %v", err)
}
resMsg := m.Copy()
resMsg.SetBytes(b)
return resMsg, nil
}, nil
}
return nil, fmt.Errorf("operator not recognised: %v", opStr)
}
//------------------------------------------------------------------------------
type processor struct {
operator msgPackOperator
}
func newProcessorFromConfig(conf *service.ParsedConfig) (*processor, error) {
operatorStr, err := conf.FieldString("operator")
if err != nil {
return nil, err
}
return newProcessor(operatorStr)
}
func newProcessor(operatorStr string) (*processor, error) {
operator, err := strToMsgPackOperator(operatorStr)
if err != nil {
return nil, err
}
return &processor{
operator: operator,
}, nil
}
func (p *processor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
resMsg, err := p.operator(msg)
if err != nil {
return nil, err
}
return service.MessageBatch{resMsg}, nil
}
func (p *processor) Close(ctx context.Context) error {
return nil
}