forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
modeutil.go
127 lines (111 loc) · 2.94 KB
/
modeutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package modeutil
import (
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/lb"
"github.com/elastic/beats/libbeat/outputs/mode/single"
)
type ClientFactory func(host string) (mode.ProtocolClient, error)
type AsyncClientFactory func(string) (mode.AsyncProtocolClient, error)
func NewConnectionMode(
clients []mode.ProtocolClient,
failover bool,
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
) (mode.ConnectionMode, error) {
if failover {
clients = NewFailoverClient(clients)
}
if len(clients) == 1 {
return single.New(clients[0], maxAttempts, waitRetry, timeout, maxWaitRetry)
}
return lb.NewSync(clients, maxAttempts, waitRetry, timeout, maxWaitRetry)
}
func NewAsyncConnectionMode(
clients []mode.AsyncProtocolClient,
failover bool,
maxAttempts int,
waitRetry, timeout, maxWaitRetry time.Duration,
) (mode.ConnectionMode, error) {
if failover {
clients = NewAsyncFailoverClient(clients)
}
return lb.NewAsync(clients, maxAttempts, waitRetry, timeout, maxWaitRetry)
}
// MakeClients will create a list from of ProtocolClient instances from
// outputer configuration host list and client factory function.
func MakeClients(
config *common.Config,
newClient ClientFactory,
) ([]mode.ProtocolClient, error) {
hosts, err := ReadHostList(config)
if err != nil {
return nil, err
}
if len(hosts) == 0 {
return nil, mode.ErrNoHostsConfigured
}
clients := make([]mode.ProtocolClient, 0, len(hosts))
for _, host := range hosts {
client, err := newClient(host)
if err != nil {
// on error destroy all client instance created
for _, client := range clients {
_ = client.Close() // ignore error
}
return nil, err
}
clients = append(clients, client)
}
return clients, nil
}
func MakeAsyncClients(
config *common.Config,
newClient AsyncClientFactory,
) ([]mode.AsyncProtocolClient, error) {
hosts, err := ReadHostList(config)
if err != nil {
return nil, err
}
if len(hosts) == 0 {
return nil, mode.ErrNoHostsConfigured
}
clients := make([]mode.AsyncProtocolClient, 0, len(hosts))
for _, host := range hosts {
client, err := newClient(host)
if err != nil {
// on error destroy all client instance created
for _, client := range clients {
_ = client.Close() // ignore error
}
return nil, err
}
clients = append(clients, client)
}
return clients, nil
}
func ReadHostList(cfg *common.Config) ([]string, error) {
config := struct {
Hosts []string `config:"hosts"`
Worker int `config:"worker"`
}{
Worker: 1,
}
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
lst := config.Hosts
if len(lst) == 0 || config.Worker <= 1 {
return lst, nil
}
// duplicate entries config.Workers times
hosts := make([]string, 0, len(lst)*config.Worker)
for _, entry := range lst {
for i := 0; i < config.Worker; i++ {
hosts = append(hosts, entry)
}
}
return hosts, nil
}