forked from vjeantet/bitfan
/
beats.go
171 lines (137 loc) · 4.19 KB
/
beats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
//go:generate bitfanDoc
package beatsinput
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"time"
"github.com/awillis/bitfan/processors"
"github.com/elastic/go-lumber/server"
)
func New() processors.Processor {
return &processor{opt: &options{}}
}
type processor struct {
processors.Base
server server.Server
opt *options
}
type options struct {
processors.CommonOptions `mapstructure:",squash"`
// The number of seconds before we raise a timeout,
// this option is useful to control how much time to wait if something is blocking
// the pipeline
CongestionThreshold int
// The IP address to listen on
Host string
// The port to listen on (default 5044)
Port int
// Events are by default send in plain text,
// you can enable encryption by using ssl to true and
// configuring the ssl_certificate and ssl_key options
SSL bool
// SSL certificate to use (path)
SSLCertificate string
// Validate client certificates against theses authorities
// You can defined multiples files or path, all the certificates will be read
// and added to the trust store.
// You need to configure the ssl_verify_mode to peer or force_peer to enable
// the verification.
// This feature only support certificate directly signed by your root ca.
// Intermediate CA are currently not supported.
SSlCertificateAuthorities []string
// SSL key to use (path)
SSlKey string
// SSL key passphrase to use. (not yet implemented)
SSlKeyPassphrase string
// By default the server dont do any client verification,
// peer will make the server ask the client to provide a certificate,
// if the client provide the certificate it will be validated.
// force_peer will make the server ask the client for their certificate,
// if the clients doesn’t provide it the connection will be closed.
// This option need to be used with ssl_certificate_authorities and a defined list of CA.
// Value can be any of: none, peer, force_peer
SSlVerifyMode string
}
func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]interface{}) error {
p.opt.CongestionThreshold = 30
p.opt.Host = "0.0.0.0"
p.opt.Port = 5044
p.opt.SSL = false
p.opt.SSlVerifyMode = "none"
return p.ConfigureAndValidate(ctx, conf, p.opt)
}
func (p *processor) Start(e processors.IPacket) error {
options := make([]server.Option, 0)
if p.opt.SSL == true {
config := new(tls.Config)
// Server Certificates
cert, err := tls.LoadX509KeyPair(p.opt.SSLCertificate, p.opt.SSlKey)
if err != nil {
return fmt.Errorf("error loading keys: %v", err)
}
config.Certificates = []tls.Certificate{cert}
// Certificate authority
if len(p.opt.SSlCertificateAuthorities) > 0 {
config.RootCAs = x509.NewCertPool()
for _, pemCertPath := range p.opt.SSlCertificateAuthorities {
pemCert, err := ioutil.ReadFile(pemCertPath)
if err != nil {
return fmt.Errorf("error loading certificate authorities: %v", err)
}
config.RootCAs.AppendCertsFromPEM(pemCert)
}
}
// SSL Verification mode
if p.opt.SSlVerifyMode == "peer" {
config.ClientAuth = tls.VerifyClientCertIfGiven
}
if p.opt.SSlVerifyMode == "force_peer" {
config.ClientAuth = tls.RequireAndVerifyClientCert
}
options = append(options, server.TLS(config))
}
options = append(options, server.Timeout(time.Second*time.Duration(p.opt.CongestionThreshold)))
svr, err := server.ListenAndServe(fmt.Sprintf("%s:%d", p.opt.Host, p.opt.Port), options...)
if err != nil {
return err
}
p.server = svr
go func(p *processor) {
rchan := p.server.ReceiveChan()
for {
var closed bool
select {
case batch, ok := <-rchan:
if ok {
for _, evt := range batch.Events {
fields := evt.(map[string]interface{})
if val, ok := fields["@timestamp"]; !ok {
fields["@timestamp"] = time.Now()
} else {
fields["@timestamp"], _ = time.Parse(time.RFC3339, val.(string))
}
ev := p.NewPacket(fields)
p.opt.ProcessCommonOptions(ev.Fields())
p.Send(ev)
}
batch.ACK()
} else {
closed = true
}
}
if closed {
break
}
}
}(p)
return nil
}
func (p *processor) Stop(e processors.IPacket) error {
err := p.server.Close()
if err != nil {
return err
}
return nil
}