-
Notifications
You must be signed in to change notification settings - Fork 485
/
spanlogs.go
129 lines (106 loc) · 3.68 KB
/
spanlogs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Package spanlogs provides an otelcol.connector.spanlogs component.
package spanlogs
import (
"context"
"fmt"
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/grafana/river"
)
func init() {
component.Register(component.Registration{
Name: "otelcol.connector.spanlogs",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},
Build: func(o component.Options, a component.Arguments) (component.Component, error) {
return New(o, a.(Arguments))
},
})
}
// Arguments configures the otelcol.connector.spanlogs component.
type Arguments struct {
Spans bool `river:"spans,attr,optional"`
Roots bool `river:"roots,attr,optional"`
Processes bool `river:"processes,attr,optional"`
SpanAttributes []string `river:"span_attributes,attr,optional"`
ProcessAttributes []string `river:"process_attributes,attr,optional"`
Overrides OverrideConfig `river:"overrides,block,optional"`
Labels []string `river:"labels,attr,optional"`
// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
}
type OverrideConfig struct {
LogsTag string `river:"logs_instance_tag,attr,optional"`
ServiceKey string `river:"service_key,attr,optional"`
SpanNameKey string `river:"span_name_key,attr,optional"`
StatusKey string `river:"status_key,attr,optional"`
DurationKey string `river:"duration_key,attr,optional"`
TraceIDKey string `river:"trace_id_key,attr,optional"`
}
var (
_ river.Defaulter = (*Arguments)(nil)
)
// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
Overrides: OverrideConfig{
LogsTag: "traces",
ServiceKey: "svc",
SpanNameKey: "span",
StatusKey: "status",
DurationKey: "dur",
TraceIDKey: "tid",
},
}
// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}
// Component is the otelcol.exporter.spanlogs component.
type Component struct {
consumer *consumer
}
var _ component.Component = (*Component)(nil)
// New creates a new otelcol.exporter.spanlogs component.
func New(o component.Options, c Arguments) (*Component, error) {
if c.Output.Traces != nil || c.Output.Metrics != nil {
level.Warn(o.Logger).Log("msg", "non-log output detected; this component only works for log outputs and trace inputs")
}
nextLogs := fanoutconsumer.Logs(c.Output.Logs)
consumer, err := NewConsumer(c, nextLogs)
if err != nil {
return nil, fmt.Errorf("failed to create a traces consumer due to error: %w", err)
}
res := &Component{
consumer: consumer,
}
if err := res.Update(c); err != nil {
return nil, err
}
// Export the consumer.
// This will remain the same throughout the component's lifetime,
// so we do this during component construction.
export := lazyconsumer.New(context.Background())
export.SetConsumers(res.consumer, nil, nil)
o.OnStateChange(otelcol.ConsumerExports{Input: export})
return res, nil
}
// Run implements Component.
func (c *Component) Run(ctx context.Context) error {
for range ctx.Done() {
return nil
}
return nil
}
// Update implements Component.
func (c *Component) Update(newConfig component.Arguments) error {
cfg := newConfig.(Arguments)
nextLogs := fanoutconsumer.Logs(cfg.Output.Logs)
err := c.consumer.UpdateOptions(cfg, nextLogs)
if err != nil {
return fmt.Errorf("failed to update traces consumer due to error: %w", err)
}
return nil
}