forked from open-telemetry/opentelemetry-collector-contrib
/
traces.go
119 lines (105 loc) · 3.71 KB
/
traces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package filterprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
import (
"context"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/multierr"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent"
)
type filterSpanProcessor struct {
skipSpanExpr expr.BoolExpr[ottlspan.TransformContext]
skipSpanEventExpr expr.BoolExpr[ottlspanevent.TransformContext]
logger *zap.Logger
}
func newFilterSpansProcessor(set component.TelemetrySettings, cfg *Config) (*filterSpanProcessor, error) {
var err error
fsp := &filterSpanProcessor{
logger: set.Logger,
}
if cfg.Traces.SpanConditions != nil || cfg.Traces.SpanEventConditions != nil {
if cfg.Traces.SpanConditions != nil {
fsp.skipSpanExpr, err = filterottl.NewBoolExprForSpan(cfg.Traces.SpanConditions, filterottl.StandardSpanFuncs(), cfg.ErrorMode, set)
if err != nil {
return nil, err
}
}
if cfg.Traces.SpanEventConditions != nil {
fsp.skipSpanEventExpr, err = filterottl.NewBoolExprForSpanEvent(cfg.Traces.SpanEventConditions, filterottl.StandardSpanEventFuncs(), cfg.ErrorMode, set)
if err != nil {
return nil, err
}
}
return fsp, nil
}
fsp.skipSpanExpr, err = filterspan.NewSkipExpr(&cfg.Spans)
if err != nil {
return nil, err
}
includeMatchType, excludeMatchType := "[None]", "[None]"
if cfg.Spans.Include != nil {
includeMatchType = string(cfg.Spans.Include.MatchType)
}
if cfg.Spans.Exclude != nil {
excludeMatchType = string(cfg.Spans.Exclude.MatchType)
}
set.Logger.Info(
"Span filter configured",
zap.String("[Include] match_type", includeMatchType),
zap.String("[Exclude] match_type", excludeMatchType),
)
return fsp, nil
}
// processTraces filters the given spans of a traces based off the filterSpanProcessor's filters.
func (fsp *filterSpanProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
if fsp.skipSpanExpr == nil && fsp.skipSpanEventExpr == nil {
return td, nil
}
var errors error
td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool {
resource := rs.Resource()
rs.ScopeSpans().RemoveIf(func(ss ptrace.ScopeSpans) bool {
scope := ss.Scope()
ss.Spans().RemoveIf(func(span ptrace.Span) bool {
if fsp.skipSpanExpr != nil {
skip, err := fsp.skipSpanExpr.Eval(ctx, ottlspan.NewTransformContext(span, scope, resource))
if err != nil {
errors = multierr.Append(errors, err)
return false
}
if skip {
return true
}
}
if fsp.skipSpanEventExpr != nil {
span.Events().RemoveIf(func(spanEvent ptrace.SpanEvent) bool {
skip, err := fsp.skipSpanEventExpr.Eval(ctx, ottlspanevent.NewTransformContext(spanEvent, span, scope, resource))
if err != nil {
errors = multierr.Append(errors, err)
return false
}
return skip
})
}
return false
})
return ss.Spans().Len() == 0
})
return rs.ScopeSpans().Len() == 0
})
if errors != nil {
fsp.logger.Error("failed processing traces", zap.Error(errors))
return td, errors
}
if td.ResourceSpans().Len() == 0 {
return td, processorhelper.ErrSkipProcessingData
}
return td, nil
}