forked from DataDog/datadog-agent
-
Notifications
You must be signed in to change notification settings - Fork 2
/
pipeline.go
128 lines (108 loc) · 5.33 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package pipeline
import (
"context"
"github.com/StackVista/stackstate-agent/pkg/logs/client"
"github.com/StackVista/stackstate-agent/pkg/logs/client/http"
"github.com/StackVista/stackstate-agent/pkg/logs/client/tcp"
"github.com/StackVista/stackstate-agent/pkg/logs/config"
"github.com/StackVista/stackstate-agent/pkg/logs/diagnostic"
"github.com/StackVista/stackstate-agent/pkg/logs/message"
"github.com/StackVista/stackstate-agent/pkg/logs/processor"
"github.com/StackVista/stackstate-agent/pkg/logs/sender"
"github.com/StackVista/stackstate-agent/pkg/security/log"
)
// Pipeline processes and sends messages to the backend
type Pipeline struct {
InputChan chan *message.Message
processor *processor.Processor
sender sender.Sender
}
// NewPipeline returns a new Pipeline
func NewPipeline(outputChan chan *message.Message, processingRules []*config.ProcessingRule, endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, diagnosticMessageReceiver diagnostic.MessageReceiver, serverless bool, pipelineID int) *Pipeline {
mainDestinations := getMainDestinations(endpoints, destinationsContext)
reliableAdditionalDestinations := getReliableAdditionalDestinations(endpoints, destinationsContext)
senderChan := make(chan *message.Message, config.ChanSize)
var logSender sender.Sender
// If there is a reliable additional endpoint - we are dual-shipping so we need to spawn an additional sender.
if reliableAdditionalDestinations != nil {
mainSender := sender.NewSingleSender(make(chan *message.Message, config.ChanSize), outputChan, mainDestinations, getStrategy(endpoints, serverless, pipelineID))
additionalSender := sender.NewSingleSender(make(chan *message.Message, config.ChanSize), outputChan, reliableAdditionalDestinations, getStrategy(endpoints, serverless, pipelineID))
logSender = sender.NewDualSender(senderChan, mainSender, additionalSender)
} else {
logSender = sender.NewSingleSender(senderChan, outputChan, mainDestinations, getStrategy(endpoints, serverless, pipelineID))
}
var encoder processor.Encoder
if serverless {
encoder = processor.JSONServerlessEncoder
} else if endpoints.UseHTTP {
encoder = processor.JSONEncoder
} else if endpoints.UseProto {
encoder = processor.ProtoEncoder
} else {
encoder = processor.RawEncoder
}
inputChan := make(chan *message.Message, config.ChanSize)
processor := processor.New(inputChan, senderChan, processingRules, encoder, diagnosticMessageReceiver)
return &Pipeline{
InputChan: inputChan,
processor: processor,
sender: logSender,
}
}
// Start launches the pipeline
func (p *Pipeline) Start() {
p.sender.Start()
p.processor.Start()
}
// Stop stops the pipeline
func (p *Pipeline) Stop() {
p.processor.Stop()
p.sender.Stop()
}
// Flush flushes synchronously the processor and sender managed by this pipeline.
func (p *Pipeline) Flush(ctx context.Context) {
p.processor.Flush(ctx) // flush messages in the processor into the sender
p.sender.Flush(ctx) // flush the sender
}
func getMainDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext) *client.Destinations {
if endpoints.UseHTTP {
main := http.NewDestination(endpoints.Main, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend)
additionals := []client.Destination{}
for _, endpoint := range endpoints.GetUnReliableAdditionals() {
additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend))
}
return client.NewDestinations(main, additionals)
}
main := tcp.NewDestination(endpoints.Main, endpoints.UseProto, destinationsContext)
additionals := []client.Destination{}
for _, endpoint := range endpoints.GetUnReliableAdditionals() {
additionals = append(additionals, tcp.NewDestination(endpoint, endpoints.UseProto, destinationsContext))
}
return client.NewDestinations(main, additionals)
}
func getReliableAdditionalDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext) *client.Destinations {
reliableEndpoints := endpoints.GetReliableAdditionals()
var reliableAdditionalEndpoint config.Endpoint
if len(reliableEndpoints) >= 1 {
log.Infof("Found an additional reliable endpoint. Only the first additional endpoint marked as reliable will be used at this time.")
reliableAdditionalEndpoint = reliableEndpoints[0]
} else {
return nil
}
if endpoints.UseHTTP {
backup := http.NewDestination(reliableAdditionalEndpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend)
return client.NewDestinations(backup, []client.Destination{})
}
backup := tcp.NewDestination(reliableAdditionalEndpoint, endpoints.UseProto, destinationsContext)
return client.NewDestinations(backup, []client.Destination{})
}
func getStrategy(endpoints *config.Endpoints, serverless bool, pipelineID int) sender.Strategy {
if endpoints.UseHTTP || serverless {
return sender.NewBatchStrategy(sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", pipelineID)
}
return sender.StreamStrategy
}