forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stdin.go
158 lines (136 loc) · 4.21 KB
/
stdin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package input
import (
"context"
"errors"
"io"
"os"
"strings"
"time"
"github.com/dafanshu/benthos/v3/internal/codec"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/input/reader"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/message"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeSTDIN] = TypeSpec{
constructor: fromSimpleConstructor(NewSTDIN),
Summary: `
Consumes data piped to stdin as line delimited messages.`,
Description: `
If the multipart option is set to true then lines are interpretted as message
parts, and an empty line indicates the end of the message.
If the delimiter field is left empty then line feed (\n) is used.`,
FieldSpecs: docs.FieldSpecs{
codec.ReaderDocs.AtVersion("3.42.0"),
docs.FieldAdvanced("max_buffer", "The maximum message buffer size. Must exceed the largest message to be consumed."),
docs.FieldDeprecated("delimiter"),
docs.FieldDeprecated("multipart"),
},
Categories: []Category{
CategoryLocal,
},
}
}
//------------------------------------------------------------------------------
// STDINConfig contains config fields for the STDIN input type.
type STDINConfig struct {
Codec string `json:"codec" yaml:"codec"`
Multipart bool `json:"multipart" yaml:"multipart"`
MaxBuffer int `json:"max_buffer" yaml:"max_buffer"`
Delim string `json:"delimiter" yaml:"delimiter"`
}
// NewSTDINConfig creates a STDINConfig populated with default values.
func NewSTDINConfig() STDINConfig {
return STDINConfig{
Codec: "lines",
Multipart: false,
MaxBuffer: 1000000,
Delim: "",
}
}
//------------------------------------------------------------------------------
// NewSTDIN creates a new STDIN input type.
func NewSTDIN(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
rdr, err := newStdinConsumer(conf.STDIN)
if err != nil {
return nil, err
}
return NewAsyncReader(
TypeSTDIN, true,
reader.NewAsyncCutOff(reader.NewAsyncPreserver(rdr)),
log, stats,
)
}
//------------------------------------------------------------------------------
type stdinConsumer struct {
scanner codec.Reader
}
func newStdinConsumer(conf STDINConfig) (*stdinConsumer, error) {
if len(conf.Delim) > 0 {
conf.Codec = "delim:" + conf.Delim
}
if conf.Multipart && !strings.HasSuffix(conf.Codec, "/multipart") {
conf.Codec += "/multipart"
}
codecConf := codec.NewReaderConfig()
codecConf.MaxScanTokenSize = conf.MaxBuffer
ctor, err := codec.GetReader(conf.Codec, codecConf)
if err != nil {
return nil, err
}
scanner, err := ctor("", os.Stdin, func(_ context.Context, err error) error {
return nil
})
if err != nil {
return nil, err
}
return &stdinConsumer{scanner}, nil
}
// ConnectWithContext attempts to establish a connection to the target S3 bucket
// and any relevant queues used to traverse the objects (SQS, etc).
func (s *stdinConsumer) ConnectWithContext(ctx context.Context) error {
return nil
}
// ReadWithContext attempts to read a new message from the target S3 bucket.
func (s *stdinConsumer) ReadWithContext(ctx context.Context) (types.Message, reader.AsyncAckFn, error) {
parts, codecAckFn, err := s.scanner.Next(ctx)
if err != nil {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
err = types.ErrTimeout
}
if err != types.ErrTimeout {
s.scanner.Close(ctx)
}
if errors.Is(err, io.EOF) {
return nil, nil, types.ErrTypeClosed
}
return nil, nil, err
}
_ = codecAckFn(ctx, nil)
msg := message.New(nil)
msg.Append(parts...)
if msg.Len() == 0 {
return nil, nil, types.ErrTimeout
}
return msg, func(rctx context.Context, res types.Response) error {
return nil
}, nil
}
// CloseAsync begins cleaning up resources used by this reader asynchronously.
func (s *stdinConsumer) CloseAsync() {
go func() {
if s.scanner != nil {
s.scanner.Close(context.Background())
}
}()
}
// WaitForClose will block until either the reader is closed or a specified
// timeout occurs.
func (s *stdinConsumer) WaitForClose(time.Duration) error {
return nil
}