/
stdout.go
139 lines (116 loc) · 3.45 KB
/
stdout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package output
import (
"context"
"os"
"time"
"github.com/Jeffail/benthos/v3/internal/codec"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/internal/shutdown"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/output/writer"
"github.com/Jeffail/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
var multipartCodecDoc = (`
## Batches and Multipart Messages
When writing multipart (batched) messages using the ` + "`lines`" + ` codec the last message ends with double delimiters. E.g. the messages "foo", "bar" and "baz" would be written as:
` + "```" + `
foo\n
bar\n
baz\n
` + "```" + `
Whereas a multipart message [ "foo", "bar", "baz" ] would be written as:
` + "```" + `
foo\n
bar\n
baz\n\n
` + "```" + `
This enables consumers of this output feed to reconstruct the original batches. However, if you wish to avoid this behaviour then add a ` + "[`split` processor](/docs/components/processors/split)" + ` before messages reach this output.`)[1:]
func init() {
Constructors[TypeSTDOUT] = TypeSpec{
constructor: fromSimpleConstructor(NewSTDOUT),
Summary: `
Prints messages to stdout as a continuous stream of data, dividing messages according to the specified codec.`,
Description: multipartCodecDoc,
FieldSpecs: docs.FieldSpecs{
codec.WriterDocs.AtVersion("3.46.0"),
docs.FieldDeprecated("delimiter"),
},
Categories: []Category{
CategoryLocal,
},
}
}
//------------------------------------------------------------------------------
// STDOUTConfig contains configuration fields for the stdout based output type.
type STDOUTConfig struct {
Codec string `json:"codec" yaml:"codec"`
Delim string `json:"delimiter" yaml:"delimiter"`
}
// NewSTDOUTConfig creates a new STDOUTConfig with default values.
func NewSTDOUTConfig() STDOUTConfig {
return STDOUTConfig{
Codec: "lines",
Delim: "",
}
}
//------------------------------------------------------------------------------
// NewSTDOUT creates a new STDOUT output type.
func NewSTDOUT(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
if len(conf.STDOUT.Delim) > 0 {
conf.STDOUT.Codec = "delim:" + conf.STDOUT.Delim
}
f, err := newStdoutWriter(conf.STDOUT.Codec, log, stats)
if err != nil {
return nil, err
}
w, err := NewAsyncWriter(TypeSTDOUT, 1, f, log, stats)
if err != nil {
return nil, err
}
if aw, ok := w.(*AsyncWriter); ok {
aw.SetNoCancel()
}
return w, nil
}
type stdoutWriter struct {
handle codec.Writer
shutSig *shutdown.Signaller
}
func newStdoutWriter(codecStr string, log log.Modular, stats metrics.Type) (*stdoutWriter, error) {
codec, _, err := codec.GetWriter(codecStr)
if err != nil {
return nil, err
}
handle, err := codec(os.Stdout)
if err != nil {
return nil, err
}
return &stdoutWriter{
handle: handle,
shutSig: shutdown.NewSignaller(),
}, nil
}
func (w *stdoutWriter) ConnectWithContext(ctx context.Context) error {
return nil
}
func (w *stdoutWriter) WriteWithContext(ctx context.Context, msg types.Message) error {
err := writer.IterateBatchedSend(msg, func(i int, p types.Part) error {
return w.handle.Write(ctx, p)
})
if err != nil {
return err
}
if msg.Len() > 1 {
if w.handle != nil {
w.handle.EndBatch()
}
}
return nil
}
func (w *stdoutWriter) CloseAsync() {
}
func (w *stdoutWriter) WaitForClose(timeout time.Duration) error {
return nil
}