forked from benthosdev/benthos
/
output_socket.go
135 lines (118 loc) · 3.36 KB
/
output_socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package io
import (
"context"
"fmt"
"net"
"sync"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/codec"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/component/output"
"github.com/benthosdev/benthos/v4/internal/component/output/processors"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/message"
)
func init() {
err := bundle.AllOutputs.Add(processors.WrapConstructor(func(c output.Config, nm bundle.NewManagement) (output.Streamed, error) {
return newSocketOutput(c, nm, nm.Logger(), nm.Metrics())
}), docs.ComponentSpec{
Name: "socket",
Summary: `Connects to a (tcp/udp/unix) server and sends a continuous stream of data, dividing messages according to the specified codec.`,
Config: docs.FieldComponent().WithChildren(
docs.FieldString("network", "The network type to connect as.").HasOptions(
"unix", "tcp", "udp",
),
docs.FieldString("address", "The address (or path) to connect to.", "/tmp/benthos.sock", "localhost:9000"),
codec.WriterDocs,
).ChildDefaultAndTypesFromStruct(output.NewSocketConfig()),
Categories: []string{
"Network",
},
})
if err != nil {
panic(err)
}
}
func newSocketOutput(conf output.Config, mgr bundle.NewManagement, log log.Modular, stats metrics.Type) (output.Streamed, error) {
t, err := newSocketWriter(conf.Socket, mgr, log)
if err != nil {
return nil, err
}
return output.NewAsyncWriter("socket", 1, t, mgr)
}
type socketWriter struct {
network string
address string
codec codec.WriterConstructor
codecConf codec.WriterConfig
log log.Modular
writer codec.Writer
writerMut sync.Mutex
}
func newSocketWriter(conf output.SocketConfig, mgr bundle.NewManagement, log log.Modular) (*socketWriter, error) {
switch conf.Network {
case "tcp", "udp", "unix":
default:
return nil, fmt.Errorf("socket network '%v' is not supported by this output", conf.Network)
}
codec, codecConf, err := codec.GetWriter(conf.Codec)
if err != nil {
return nil, err
}
t := socketWriter{
network: conf.Network,
address: conf.Address,
codec: codec,
codecConf: codecConf,
log: log,
}
return &t, nil
}
func (s *socketWriter) Connect(ctx context.Context) error {
s.writerMut.Lock()
defer s.writerMut.Unlock()
if s.writer != nil {
return nil
}
conn, err := net.Dial(s.network, s.address)
if err != nil {
return err
}
s.writer, err = s.codec(conn)
if err != nil {
conn.Close()
return err
}
s.log.Infof("Sending messages over %v socket to: %s\n", s.network, s.address)
return nil
}
func (s *socketWriter) WriteBatch(ctx context.Context, msg message.Batch) error {
s.writerMut.Lock()
w := s.writer
s.writerMut.Unlock()
if w == nil {
return component.ErrNotConnected
}
return msg.Iter(func(i int, part *message.Part) error {
serr := w.Write(ctx, part)
if serr != nil || s.codecConf.CloseAfter {
s.writerMut.Lock()
s.writer.Close(ctx)
s.writer = nil
s.writerMut.Unlock()
}
return serr
})
}
func (s *socketWriter) Close(ctx context.Context) error {
s.writerMut.Lock()
defer s.writerMut.Unlock()
var err error
if s.writer != nil {
err = s.writer.Close(context.Background())
s.writer = nil
}
return err
}