forked from DataDog/datadog-agent
-
Notifications
You must be signed in to change notification settings - Fork 2
/
provider.go
114 lines (98 loc) · 4.03 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package pipeline
import (
"context"
"sync/atomic"
"github.com/StackVista/stackstate-agent/pkg/logs/diagnostic"
"github.com/StackVista/stackstate-agent/pkg/logs/auditor"
"github.com/StackVista/stackstate-agent/pkg/logs/client"
"github.com/StackVista/stackstate-agent/pkg/logs/config"
"github.com/StackVista/stackstate-agent/pkg/logs/message"
"github.com/StackVista/stackstate-agent/pkg/logs/restart"
)
// Provider provides message channels
type Provider interface {
Start()
Stop()
NextPipelineChan() chan *message.Message
// Flush flushes all pipeline contained in this Provider
Flush(ctx context.Context)
}
// provider implements providing logic
type provider struct {
numberOfPipelines int
auditor auditor.Auditor
diagnosticMessageReceiver diagnostic.MessageReceiver
outputChan chan *message.Message
processingRules []*config.ProcessingRule
endpoints *config.Endpoints
pipelines []*Pipeline
currentPipelineIndex uint32
destinationsContext *client.DestinationsContext
serverless bool
}
// NewProvider returns a new Provider
func NewProvider(numberOfPipelines int, auditor auditor.Auditor, diagnosticMessageReceiver diagnostic.MessageReceiver, processingRules []*config.ProcessingRule, endpoints *config.Endpoints, destinationsContext *client.DestinationsContext) Provider {
return newProvider(numberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsContext, false)
}
// NewServerlessProvider returns a new Provider in serverless mode
func NewServerlessProvider(numberOfPipelines int, auditor auditor.Auditor, processingRules []*config.ProcessingRule, endpoints *config.Endpoints, destinationsContext *client.DestinationsContext) Provider {
return newProvider(numberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, processingRules, endpoints, destinationsContext, true)
}
func newProvider(numberOfPipelines int, auditor auditor.Auditor, diagnosticMessageReceiver diagnostic.MessageReceiver, processingRules []*config.ProcessingRule, endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, serverless bool) Provider {
return &provider{
numberOfPipelines: numberOfPipelines,
auditor: auditor,
diagnosticMessageReceiver: diagnosticMessageReceiver,
processingRules: processingRules,
endpoints: endpoints,
pipelines: []*Pipeline{},
destinationsContext: destinationsContext,
serverless: serverless,
}
}
// Start initializes the pipelines
func (p *provider) Start() {
// This requires the auditor to be started before.
p.outputChan = p.auditor.Channel()
for i := 0; i < p.numberOfPipelines; i++ {
pipeline := NewPipeline(p.outputChan, p.processingRules, p.endpoints, p.destinationsContext, p.diagnosticMessageReceiver, p.serverless, i)
pipeline.Start()
p.pipelines = append(p.pipelines, pipeline)
}
}
// Stop stops all pipelines in parallel,
// this call blocks until all pipelines are stopped
func (p *provider) Stop() {
stopper := restart.NewParallelStopper()
for _, pipeline := range p.pipelines {
stopper.Add(pipeline)
}
stopper.Stop()
p.pipelines = p.pipelines[:0]
p.outputChan = nil
}
// NextPipelineChan returns the next pipeline input channel
func (p *provider) NextPipelineChan() chan *message.Message {
pipelinesLen := len(p.pipelines)
if pipelinesLen == 0 {
return nil
}
index := atomic.AddUint32(&p.currentPipelineIndex, uint32(1)) % uint32(pipelinesLen)
nextPipeline := p.pipelines[index]
return nextPipeline.InputChan
}
// Flush flushes synchronously all the contained pipeline of this provider.
func (p *provider) Flush(ctx context.Context) {
for _, p := range p.pipelines {
select {
case <-ctx.Done():
return
default:
p.Flush(ctx)
}
}
}