/
pipeline_group.go
238 lines (205 loc) · 6.81 KB
/
pipeline_group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package otelcolconvert
import (
"cmp"
"fmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/pipelines"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
// pipelineGroup groups a set of pipelines together by their telemetry type.
type pipelineGroup struct {
// Name of the group. May be an empty string.
Name string
Metrics *pipelines.PipelineConfig
Logs *pipelines.PipelineConfig
Traces *pipelines.PipelineConfig
}
// createPipelineGroups groups pipelines of different telemetry types together
// by the user-specified pipeline name. For example, the following
// configuration creates two groups:
//
// # (component definitions are omitted for brevity)
//
// pipelines:
// metrics: # ID: metrics/<empty>
// receivers: [otlp]
// exporters: [otlp]
// logs: # ID: logs/<empty
// receivers: [otlp]
// exporters: [otlp]
// metrics/2: # ID: metrics/2
// receivers: [otlp/2]
// exporters: [otlp/2]
// traces/2: # ID: traces/2
// receivers: [otlp/2]
// exporters: [otlp/2]
//
// Here, the two groups are [metrics/<empty> logs/<empty>] and [metrics/2
// traces/2]. The key used for grouping is the name of the pipeline, so that
// pipelines with matching names belong to the same group.
//
// This allows us to emit an Alloy-native pipeline, where one component is
// responsible for multiple telemetry types, as opposed as to creating the
// otlp/2 receiver two separate times (once for metrics and once for traces).
//
// Note that OpenTelemetry guaratees that the pipeline name is unique, so there
// can't be two pipelines called metrics/2; any given pipeline group is
// guaranteed to contain at most one pipeline of each telemetry type.
func createPipelineGroups(cfg pipelines.Config) ([]pipelineGroup, error) {
groups := map[string]pipelineGroup{}
for key, config := range cfg {
name := key.Name()
group := groups[name]
group.Name = name
switch key.Type() {
case component.DataTypeMetrics:
if group.Metrics != nil {
return nil, fmt.Errorf("duplicate metrics pipeline for pipeline named %q", name)
}
group.Metrics = config
case component.DataTypeLogs:
if group.Logs != nil {
return nil, fmt.Errorf("duplicate logs pipeline for pipeline named %q", name)
}
group.Logs = config
case component.DataTypeTraces:
if group.Traces != nil {
return nil, fmt.Errorf("duplicate traces pipeline for pipeline named %q", name)
}
group.Traces = config
default:
return nil, fmt.Errorf("unknown pipeline type %q", key.Type())
}
groups[name] = group
}
// Initialize created groups.
for key, group := range groups {
if group.Metrics == nil {
group.Metrics = &pipelines.PipelineConfig{}
}
if group.Logs == nil {
group.Logs = &pipelines.PipelineConfig{}
}
if group.Traces == nil {
group.Traces = &pipelines.PipelineConfig{}
}
groups[key] = group
}
res := maps.Values(groups)
slices.SortStableFunc(res, func(a, b pipelineGroup) int {
return cmp.Compare(a.Name, b.Name)
})
return res, nil
}
// Receivers returns a set of unique IDs for receivers across all telemetry
// types.
func (group pipelineGroup) Receivers() []component.ID {
return mergeIDs(
group.Metrics.Receivers,
group.Logs.Receivers,
group.Traces.Receivers,
)
}
// Processors returns a set of unique IDs for processors across all telemetry
// types.
func (group pipelineGroup) Processors() []component.ID {
return mergeIDs(
group.Metrics.Processors,
group.Logs.Processors,
group.Traces.Processors,
)
}
// Exporters returns a set of unique IDs for exporters across all telemetry
// types.
func (group pipelineGroup) Exporters() []component.ID {
return mergeIDs(
group.Metrics.Exporters,
group.Logs.Exporters,
group.Traces.Exporters,
)
}
// mergeIDs merges a set of IDs into a unique list.
func mergeIDs(in ...[]component.ID) []component.ID {
var res []component.ID
unique := map[component.ID]struct{}{}
for _, set := range in {
for _, id := range set {
if _, exists := unique[id]; exists {
continue
}
res = append(res, id)
unique[id] = struct{}{}
}
}
return res
}
// NextMetrics returns the set of components who should be sent metrics from
// the given component ID.
func (group pipelineGroup) NextMetrics(fromID component.InstanceID) []component.InstanceID {
return nextInPipeline(group.Metrics, fromID)
}
// NextLogs returns the set of components who should be sent logs from the
// given component ID.
func (group pipelineGroup) NextLogs(fromID component.InstanceID) []component.InstanceID {
return nextInPipeline(group.Logs, fromID)
}
// NextTraces returns the set of components who should be sent traces from the
// given component ID.
func (group pipelineGroup) NextTraces(fromID component.InstanceID) []component.InstanceID {
return nextInPipeline(group.Traces, fromID)
}
func nextInPipeline(pipeline *pipelines.PipelineConfig, fromID component.InstanceID) []component.InstanceID {
switch fromID.Kind {
case component.KindReceiver, component.KindConnector:
// Validate this receiver is part of the pipeline.
if !findInComponentIds(fromID, pipeline.Receivers) {
return nil
}
// Receivers and connectors should either send to the first processor
// if one exists or to every exporter otherwise.
if len(pipeline.Processors) > 0 {
return []component.InstanceID{{Kind: component.KindProcessor, ID: pipeline.Processors[0]}}
}
return toComponentInstanceIDs(component.KindExporter, pipeline.Exporters)
case component.KindProcessor:
// Validate this processor is part of the pipeline.
if !findInComponentIds(fromID, pipeline.Processors) {
return nil
}
// Processors should send to the next processor if one exists or to every
// exporter otherwise.
processorIndex := slices.Index(pipeline.Processors, fromID.ID)
if processorIndex+1 < len(pipeline.Processors) {
// Send to next processor.
return []component.InstanceID{{Kind: component.KindProcessor, ID: pipeline.Processors[processorIndex+1]}}
}
return toComponentInstanceIDs(component.KindExporter, pipeline.Exporters)
case component.KindExporter:
// Exporters never send to another otelcol component.
return nil
default:
panic(fmt.Sprintf("nextInPipeline: unsupported component kind %v", fromID.Kind))
}
}
// toComponentInstanceIDs converts a slice of [component.ID] into a slice of
// [component.InstanceID]. Each element in the returned slice will have a
// kind matching the provided kind argument.
func toComponentInstanceIDs(kind component.Kind, ids []component.ID) []component.InstanceID {
res := make([]component.InstanceID, 0, len(ids))
for _, id := range ids {
res = append(res, component.InstanceID{
ID: id,
Kind: kind,
})
}
return res
}
func findInComponentIds(fromID component.InstanceID, componentIDs []component.ID) bool {
for _, id := range componentIDs {
if fromID.ID == id {
return true
}
}
return false
}