-
Notifications
You must be signed in to change notification settings - Fork 486
/
Copy pathtraces.go
184 lines (152 loc) · 4.63 KB
/
traces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package traces
import (
"fmt"
"os"
"sync"
"time"
"contrib.go.opencensus.io/exporter/prometheus"
"github.com/grafana/agent/pkg/logs"
"github.com/grafana/agent/pkg/metrics/instance"
zaplogfmt "github.com/jsternberg/zap-logfmt"
prom_client "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/external/obsreportconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.opentelemetry.io/collector/config/configtelemetry"
)
// Traces wraps the OpenTelemetry collector to enable tracing pipelines
type Traces struct {
mut sync.Mutex
instances map[string]*Instance
leveller *logLeveller
logger *zap.Logger
reg prom_client.Registerer
promInstanceManager instance.Manager
}
// New creates and starts trace collection.
func New(logsSubsystem *logs.Logs, promInstanceManager instance.Manager, reg prom_client.Registerer, cfg Config, level logrus.Level) (*Traces, error) {
var leveller logLeveller
traces := &Traces{
instances: make(map[string]*Instance),
leveller: &leveller,
logger: newLogger(&leveller),
reg: reg,
promInstanceManager: promInstanceManager,
}
if err := traces.ApplyConfig(logsSubsystem, promInstanceManager, cfg, level); err != nil {
return nil, err
}
return traces, nil
}
// ApplyConfig updates Traces with a new Config.
func (t *Traces) ApplyConfig(logsSubsystem *logs.Logs, promInstanceManager instance.Manager, cfg Config, level logrus.Level) error {
t.mut.Lock()
defer t.mut.Unlock()
// Update the log level, if it has changed.
t.leveller.SetLevel(level)
newInstances := make(map[string]*Instance, len(cfg.Configs))
for _, c := range cfg.Configs {
var (
instReg = prom_client.WrapRegistererWith(prom_client.Labels{"traces_config": c.Name}, t.reg)
)
// If an old instance exists, update it and move it to the new map.
if old, ok := t.instances[c.Name]; ok {
err := old.ApplyConfig(logsSubsystem, promInstanceManager, instReg, c)
if err != nil {
return err
}
newInstances[c.Name] = old
continue
}
var (
instLogger = t.logger.With(zap.String("traces_config", c.Name))
)
inst, err := NewInstance(logsSubsystem, instReg, c, instLogger, t.promInstanceManager)
if err != nil {
return fmt.Errorf("failed to create tracing instance %s: %w", c.Name, err)
}
newInstances[c.Name] = inst
}
// Any instance in l.instances that isn't in newInstances has been removed
// from the config. Stop them before replacing the map.
for key, i := range t.instances {
if _, exist := newInstances[key]; exist {
continue
}
i.Stop()
}
t.instances = newInstances
return nil
}
// Stop stops the OpenTelemetry collector subsystem
func (t *Traces) Stop() {
t.mut.Lock()
defer t.mut.Unlock()
for _, i := range t.instances {
i.Stop()
}
}
func newLogger(zapLevel zapcore.LevelEnabler) *zap.Logger {
config := zap.NewProductionEncoderConfig()
config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(ts.UTC().Format(time.RFC3339))
}
logger := zap.New(zapcore.NewCore(
zaplogfmt.NewEncoder(config),
os.Stdout,
zapLevel,
), zap.AddCaller())
logger = logger.With(zap.String("component", "traces"))
logger.Info("Traces Logger Initialized")
return logger
}
// logLeveller implements the zapcore.LevelEnabler interface and allows for
// switching out log levels at runtime.
type logLeveller struct {
mut sync.RWMutex
inner zapcore.Level
}
func (l *logLeveller) SetLevel(level logrus.Level) {
l.mut.Lock()
defer l.mut.Unlock()
zapLevel := zapcore.InfoLevel
switch level {
case logrus.PanicLevel:
zapLevel = zapcore.PanicLevel
case logrus.FatalLevel:
zapLevel = zapcore.FatalLevel
case logrus.ErrorLevel:
zapLevel = zapcore.ErrorLevel
case logrus.WarnLevel:
zapLevel = zapcore.WarnLevel
case logrus.InfoLevel:
zapLevel = zapcore.InfoLevel
case logrus.DebugLevel:
case logrus.TraceLevel:
zapLevel = zapcore.DebugLevel
}
l.inner = zapLevel
}
func (l *logLeveller) Enabled(target zapcore.Level) bool {
l.mut.RLock()
defer l.mut.RUnlock()
return l.inner.Enabled(target)
}
func newMetricViews(reg prom_client.Registerer) ([]*view.View, error) {
obsMetrics := obsreportconfig.Configure(configtelemetry.LevelBasic)
err := view.Register(obsMetrics.Views...)
if err != nil {
return nil, fmt.Errorf("failed to register views: %w", err)
}
pe, err := prometheus.NewExporter(prometheus.Options{
Namespace: "traces",
Registerer: reg,
})
if err != nil {
return nil, fmt.Errorf("failed to create prometheus exporter: %w", err)
}
view.RegisterExporter(pe)
return obsMetrics.Views, nil
}