/
traceability.go
446 lines (389 loc) · 12.6 KB
/
traceability.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
package traceability
import (
"context"
"fmt"
"math/rand"
"net/url"
"os"
"path"
"reflect"
"sync"
"unsafe"
"github.com/Axway/agent-sdk/pkg/agent"
"github.com/Axway/agent-sdk/pkg/jobs"
"github.com/Axway/agent-sdk/pkg/traceability/sampling"
"github.com/Axway/agent-sdk/pkg/util"
"github.com/Axway/agent-sdk/pkg/util/log"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/paths"
"github.com/elastic/beats/v7/libbeat/publisher"
"golang.org/x/net/proxy"
hc "github.com/Axway/agent-sdk/pkg/util/healthcheck"
)
const (
countStr = "count"
eventTypeStr = "event-type"
)
// OutputEventProcessor - P
type OutputEventProcessor interface {
Process(events []publisher.Event) []publisher.Event
}
var outputEventProcessor OutputEventProcessor
var pathDataMutex sync.Mutex = sync.Mutex{}
const (
minWindowSize int = 1
defaultStartMaxWindowSize int = 10
defaultPort = 5044
traceabilityStr = "traceability"
HealthCheckEndpoint = traceabilityStr
)
var traceabilityClients []*Client
var clientMutex *sync.Mutex
var traceCfg *Config
// GetClient - returns a random client from the clients array
var GetClient = getClient
func addClient(c *Client) {
clientMutex.Lock()
defer clientMutex.Unlock()
traceabilityClients = append(traceabilityClients, c)
}
func getClient() (*Client, error) {
clientMutex.Lock()
defer clientMutex.Unlock()
switch clients := len(traceabilityClients); clients {
case 0:
return nil, fmt.Errorf("no traceability clients, can't publish metrics")
case 1:
return traceabilityClients[0], nil
default:
randomIndex := rand.Intn(len(traceabilityClients))
return traceabilityClients[randomIndex], nil
}
}
// Client - struct
type Client struct {
sync.Mutex
transportClient outputs.Client
logger log.FieldLogger
}
func init() {
clientMutex = &sync.Mutex{}
outputs.RegisterType(traceabilityStr, makeTraceabilityAgent)
}
// SetOutputEventProcessor -
func SetOutputEventProcessor(eventProcessor OutputEventProcessor) {
outputEventProcessor = eventProcessor
}
// GetDataDirPath - Returns the path of the data directory
func GetDataDirPath() string {
pathDataMutex.Lock()
defer pathDataMutex.Unlock()
return paths.Paths.Data
}
// SetDataDirPath - Sets the path of the data directory
func SetDataDirPath(path string) {
pathDataMutex.Lock()
defer pathDataMutex.Unlock()
paths.Paths.Data = path
}
// checkCreateDir
func createDirIfNotExist(dirPath string) {
_, err := os.Stat(dirPath)
if os.IsNotExist(err) {
// Create the directory with the same permissions as the data dir
dataInfo, _ := os.Stat(GetDataDirPath())
os.MkdirAll(dirPath, dataInfo.Mode().Perm())
}
}
// GetCacheDirPath - Returns the path of the cache directory
func GetCacheDirPath() string {
cacheDir := path.Join(GetDataDirPath(), "cache")
createDirIfNotExist(cacheDir)
return cacheDir
}
// GetReportsDirPath - Returns the path of the reports directory
func GetReportsDirPath() string {
reportDir := path.Join(GetDataDirPath(), "reports")
createDirIfNotExist(reportDir)
return reportDir
}
func makeTraceabilityAgent(
indexManager outputs.IndexManager,
beat beat.Info,
observer outputs.Observer,
libbeatCfg *common.Config,
) (outputs.Group, error) {
logger := log.NewFieldLogger().
WithPackage("sdk.traceability").
WithComponent("makeTraceabilityAgent")
var err error
traceCfg, err = readConfig(libbeatCfg, beat)
if err != nil {
agent.UpdateStatusWithPrevious(agent.AgentFailed, agent.AgentRunning, err.Error())
return outputs.Fail(err)
}
hosts, err := outputs.ReadHostList(libbeatCfg)
if err != nil {
agent.UpdateStatusWithPrevious(agent.AgentFailed, agent.AgentRunning, err.Error())
return outputs.Fail(err)
}
var transportGroup outputs.Group
logger.Tracef("initializing traceability client using config: %+v, host: %+v", traceCfg, hosts)
isSingleEntry := agent.GetCentralConfig().GetSingleURL() != ""
if !isSingleEntry && IsHTTPTransport() {
transportGroup, err = makeHTTPClient(beat, observer, traceCfg, hosts)
} else {
// For Single entry point register dialer factory for sni scheme and set the
// proxy url with sni scheme. When libbeat will register its dialer and sees
// proxy url with sni scheme, it will invoke the factory to construct the dialer
// The dialer will be invoked as proxy dialer in the libbeat dialer chain
// (proxy dialer, stat dialer, tls dialer).
if isSingleEntry {
if IsHTTPTransport() {
traceCfg.Protocol = "tcp"
logger.Warn("switching to tcp protocol instead of http because agent will use single entry endpoint")
}
// Register dialer factory with sni scheme for single entry point
proxy.RegisterDialerType("sni", ingestionSingleEntryDialer)
// If real proxy configured(not the sni proxy set here), validate the scheme
// since libbeats proxy dialer will not be invoked.
if traceCfg.Proxy.URL != "" {
proxCfg := &transport.ProxyConfig{
URL: traceCfg.Proxy.URL,
LocalResolve: traceCfg.Proxy.LocalResolve,
}
err := proxCfg.Validate()
if err != nil {
outputs.Fail(err)
}
}
// Replace the proxy URL to sni by setting the environment variable
// Libbeat parses the yaml file and replaces the value from yaml
// with overridden environment variable.
// Set the sni host to the ingestion service host to allow the
// single entry dialer to receive the target address
os.Setenv("TRACEABILITY_PROXYURL", "sni://"+traceCfg.Hosts[0])
}
transportGroup, err = makeLogstashClient(indexManager, beat, observer, libbeatCfg, traceCfg)
}
if err != nil {
return outputs.Fail(err)
}
traceabilityGroup := outputs.Group{
BatchSize: transportGroup.BatchSize,
Retry: transportGroup.Retry,
}
clients := make([]outputs.Client, 0)
logger = logger.WithField("component", "Client")
for _, client := range transportGroup.Clients {
outputClient := &Client{
transportClient: client,
logger: logger,
}
clients = append(clients, outputClient)
addClient(outputClient)
}
traceabilityGroup.Clients = clients
return traceabilityGroup, nil
}
func makeLogstashClient(indexManager outputs.IndexManager,
beat beat.Info,
observer outputs.Observer,
libbeatCfg *common.Config,
traceCfg *Config,
) (outputs.Group, error) {
factory := outputs.FindFactory("logstash")
if factory == nil {
return outputs.Group{}, nil
}
// only run the health check if in online mode
if !agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() && util.IsNotTest() {
err := registerHealthCheckers(traceCfg)
if err != nil {
return outputs.Group{}, err
}
}
group, err := factory(indexManager, beat, observer, libbeatCfg)
return group, err
}
// Factory method for creating dialer for sni scheme
// Setup the single entry point dialer with single entry host mapping based
// on central config and traceability proxy url from original config that gets
// read by traceability output factory(makeTraceabilityAgent)
func ingestionSingleEntryDialer(proxyURL *url.URL, parentDialer proxy.Dialer) (proxy.Dialer, error) {
var traceProxyURL *url.URL
var err error
if traceCfg != nil && traceCfg.Proxy.URL != "" {
traceProxyURL, err = url.Parse(traceCfg.Proxy.URL)
if err != nil {
return nil, fmt.Errorf("proxy could not be parsed. %s", err.Error())
}
}
var singleEntryHostMap map[string]string
if agent.GetCentralConfig() != nil {
cfgSingleURL := agent.GetCentralConfig().GetSingleURL()
if cfgSingleURL != "" {
// cfgSingleURL should not be empty as the factory method is registered based on that check
singleEntryURL, err := url.Parse(cfgSingleURL)
if err == nil && traceCfg != nil {
singleEntryHostMap = map[string]string{
traceCfg.Hosts[0]: util.ParseAddr(singleEntryURL),
}
}
}
}
dialer := util.NewDialer(traceProxyURL, singleEntryHostMap)
return dialer, nil
}
func makeHTTPClient(beat beat.Info, observer outputs.Observer, traceCfg *Config, hosts []string) (outputs.Group, error) {
tls, err := tlscommon.LoadTLSConfig(traceCfg.TLS)
if err != nil {
agent.UpdateStatusWithPrevious(agent.AgentFailed, agent.AgentRunning, err.Error())
return outputs.Fail(err)
}
clients := make([]outputs.NetworkClient, len(hosts))
for i, host := range hosts {
hostURL, err := common.MakeURL(traceCfg.Protocol, "/", host, 443)
if err != nil {
return outputs.Fail(err)
}
proxyURL, err := url.Parse(traceCfg.Proxy.URL)
if err != nil {
return outputs.Fail(err)
}
var client outputs.NetworkClient
client, err = NewHTTPClient(HTTPClientSettings{
BeatInfo: beat,
URL: hostURL,
Proxy: proxyURL,
TLS: tls,
Timeout: traceCfg.Timeout,
CompressionLevel: traceCfg.CompressionLevel,
Observer: observer,
})
if err != nil {
return outputs.Fail(err)
}
client = outputs.WithBackoff(client, traceCfg.Backoff.Init, traceCfg.Backoff.Max)
clients[i] = client
}
if !agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() && util.IsNotTest() {
err := registerHealthCheckers(traceCfg)
if err != nil {
return outputs.Group{}, err
}
}
return outputs.SuccessNet(traceCfg.LoadBalance, traceCfg.BulkMaxSize, traceCfg.MaxRetries, clients)
}
// SetTransportClient - set the transport client
func (client *Client) SetTransportClient(outputClient outputs.Client) {
client.Lock()
defer client.Unlock()
client.transportClient = outputClient
}
// SetTransportClient - set the transport client
func (client *Client) getTransportClient() outputs.Client {
client.Lock()
defer client.Unlock()
return client.transportClient
}
// SetLogger - set the logger
func (client *Client) SetLogger(logger log.FieldLogger) {
client.logger = logger
}
// Connect establishes a connection to the clients sink.
func (client *Client) Connect() error {
// do not attempt to establish a connection in offline mode
if agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() {
return nil
}
networkClient := client.getTransportClient().(outputs.NetworkClient)
err := networkClient.Connect()
if err != nil {
return err
}
return nil
}
// Close publish a single event to output.
func (client *Client) Close() error {
// do not attempt to close a connection in offline mode, it was never established
if agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() {
return nil
}
err := client.getTransportClient().Close()
if err != nil {
return err
}
return nil
}
// Publish sends events to the clients sink.
func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error {
events := batch.Events()
if len(events) == 0 {
batch.ACK()
return nil // nothing to do
}
_, isMetric := events[0].Content.Meta["metric"]
if agent.GetCentralConfig().GetUsageReportingConfig().IsOfflineMode() {
if outputEventProcessor != nil && !isMetric {
outputEventProcessor.Process(events)
}
batch.ACK()
return nil
}
logger := client.logger.WithField(eventTypeStr, "metric")
if !isMetric {
logger = logger.WithField(eventTypeStr, "transaction")
if outputEventProcessor != nil {
updatedEvents := outputEventProcessor.Process(events)
updateEvent(batch, updatedEvents)
}
sampledEvents, err := sampling.FilterEvents(batch.Events())
if err != nil {
logger.Error(err.Error())
}
updateEvent(batch, sampledEvents)
}
events = batch.Events()
if len(events) == 0 {
batch.ACK()
return nil // nothing to do
}
logger = logger.WithField(countStr, len(events))
logger.Info("publishing events")
err := client.getTransportClient().Publish(ctx, batch)
if err != nil {
logger.WithError(err).Error("failed to publish events")
return err
}
logger.Info("published events")
return nil
}
func (client *Client) String() string {
return traceabilityStr
}
// updateEvent - updates the private field events in publisher.Batch
func updateEvent(batch publisher.Batch, events []publisher.Event) {
pointerVal := reflect.ValueOf(batch)
val := reflect.Indirect(pointerVal)
member := val.FieldByName("events")
ptrToEvents := unsafe.Pointer(member.UnsafeAddr())
realPtrToEvents := (*[]publisher.Event)(ptrToEvents)
*realPtrToEvents = events
}
func registerHealthCheckers(config *Config) error {
hcJob := newTraceabilityHealthCheckJob()
_, err := jobs.RegisterIntervalJobWithName(hcJob, config.Timeout, "Traceability Health Check")
if err != nil {
return err
}
_, err = hc.RegisterHealthcheck("Traceability Agent", HealthCheckEndpoint, hcJob.healthcheck)
if err != nil {
return err
}
return nil
}