forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tcp.go
102 lines (89 loc) · 2.77 KB
/
tcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package input
import (
"io"
"net"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/input/reader"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeTCP] = TypeSpec{
constructor: fromSimpleConstructor(NewTCP),
Description: `
Connects to a TCP server and consumes a continuous stream of messages.
If multipart is set to false each line of data is read as a separate message. If
multipart is set to true each line is read as a message part, and an empty line
indicates the end of a message.
Messages consumed by this input can be processed in parallel, meaning a single
instance of this input can utilise any number of threads within a
` + "`pipeline`" + ` section of a config.
If the delimiter field is left empty then line feed (\n) is used.`,
Status: docs.StatusDeprecated,
config: docs.FieldComponent().WithChildren(
docs.FieldCommon("address", ""),
docs.FieldCommon("multipart", ""),
docs.FieldCommon("max_buffer", ""),
docs.FieldCommon("delimiter", ""),
),
}
}
//------------------------------------------------------------------------------
// TCPConfig contains configuration values for the TCP input type.
type TCPConfig struct {
Address string `json:"address" yaml:"address"`
Multipart bool `json:"multipart" yaml:"multipart"`
MaxBuffer int `json:"max_buffer" yaml:"max_buffer"`
Delim string `json:"delimiter" yaml:"delimiter"`
}
// NewTCPConfig creates a new TCPConfig with default values.
func NewTCPConfig() TCPConfig {
return TCPConfig{
Address: "localhost:4194",
Multipart: false,
MaxBuffer: 1000000,
Delim: "",
}
}
//------------------------------------------------------------------------------
// NewTCP creates a new TCP input type.
func NewTCP(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
log.Warnln("The tcp input is deprecated, please use socket instead.")
delim := conf.TCP.Delim
if delim == "" {
delim = "\n"
}
var conn net.Conn
rdr, err := reader.NewLines(
func() (io.Reader, error) {
if conn != nil {
conn.Close()
conn = nil
}
var err error
conn, err = net.Dial("tcp", conf.TCP.Address)
return conn, err
},
func() {
if conn != nil {
conn.Close()
conn = nil
}
},
reader.OptLinesSetDelimiter(delim),
reader.OptLinesSetMaxBuffer(conf.TCP.MaxBuffer),
reader.OptLinesSetMultipart(conf.TCP.Multipart),
)
if err != nil {
return nil, err
}
return NewAsyncReader(
TypeTCP,
true,
reader.NewAsyncPreserver(rdr),
log, stats,
)
}
//------------------------------------------------------------------------------