forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
modeutil.go
136 lines (118 loc) · 3.05 KB
/
modeutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package modeutil
import (
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/lb"
"github.com/elastic/beats/libbeat/outputs/mode/single"
)
type ClientFactory func(host string) (mode.ProtocolClient, error)
type AsyncClientFactory func(string) (mode.AsyncProtocolClient, error)
type Settings struct {
Failover bool
MaxAttempts int
WaitRetry time.Duration
Timeout time.Duration
MaxWaitRetry time.Duration
}
func NewConnectionMode(
clients []mode.ProtocolClient,
s Settings,
) (mode.ConnectionMode, error) {
if s.Failover {
clients = NewFailoverClient(clients)
}
maxSend := s.MaxAttempts
wait := s.WaitRetry
maxWait := s.MaxWaitRetry
to := s.Timeout
if len(clients) == 1 {
return single.New(clients[0], maxSend, wait, to, maxWait)
}
return lb.NewSync(clients, maxSend, wait, to, maxWait)
}
func NewAsyncConnectionMode(
clients []mode.AsyncProtocolClient,
s Settings,
) (mode.ConnectionMode, error) {
if s.Failover {
clients = NewAsyncFailoverClient(clients)
}
return lb.NewAsync(clients, s.MaxAttempts, s.WaitRetry, s.Timeout, s.MaxWaitRetry)
}
// MakeClients will create a list from of ProtocolClient instances from
// outputer configuration host list and client factory function.
func MakeClients(
config *common.Config,
newClient ClientFactory,
) ([]mode.ProtocolClient, error) {
hosts, err := ReadHostList(config)
if err != nil {
return nil, err
}
if len(hosts) == 0 {
return nil, mode.ErrNoHostsConfigured
}
clients := make([]mode.ProtocolClient, 0, len(hosts))
for _, host := range hosts {
client, err := newClient(host)
if err != nil {
// on error destroy all client instance created
for _, client := range clients {
_ = client.Close() // ignore error
}
return nil, err
}
clients = append(clients, client)
}
return clients, nil
}
func MakeAsyncClients(
config *common.Config,
newClient AsyncClientFactory,
) ([]mode.AsyncProtocolClient, error) {
hosts, err := ReadHostList(config)
if err != nil {
return nil, err
}
if len(hosts) == 0 {
return nil, mode.ErrNoHostsConfigured
}
clients := make([]mode.AsyncProtocolClient, 0, len(hosts))
for _, host := range hosts {
client, err := newClient(host)
if err != nil {
// on error destroy all client instance created
for _, client := range clients {
_ = client.Close() // ignore error
}
return nil, err
}
clients = append(clients, client)
}
return clients, nil
}
func ReadHostList(cfg *common.Config) ([]string, error) {
config := struct {
Hosts []string `config:"hosts" validate:"required"`
Worker int `config:"worker" validate:"min=1"`
}{
Worker: 1,
}
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
lst := config.Hosts
if len(lst) == 0 || config.Worker <= 1 {
return lst, nil
}
// duplicate entries config.Workers times
hosts := make([]string, 0, len(lst)*config.Worker)
for _, entry := range lst {
for i := 0; i < config.Worker; i++ {
hosts = append(hosts, entry)
}
}
return hosts, nil
}