/
finder.go
110 lines (95 loc) · 4.29 KB
/
finder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package discover
import (
"context"
"fmt"
"github.com/mariomac/pipes/pipe"
"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/internal/ebpf"
"github.com/grafana/beyla/pkg/internal/ebpf/goruntime"
"github.com/grafana/beyla/pkg/internal/ebpf/grpc"
"github.com/grafana/beyla/pkg/internal/ebpf/httpfltr"
"github.com/grafana/beyla/pkg/internal/ebpf/httpssl"
"github.com/grafana/beyla/pkg/internal/ebpf/nethttp"
"github.com/grafana/beyla/pkg/internal/imetrics"
"github.com/grafana/beyla/pkg/internal/pipe/global"
)
type ProcessFinder struct {
ctx context.Context
cfg *beyla.Config
ctxInfo *global.ContextInfo
}
// nodesMap stores ProcessFinder pipeline architecture
type nodesMap struct {
ProcessWatcher pipe.Start[[]Event[processAttrs]]
WatcherKubeEnricher pipe.Middle[[]Event[processAttrs], []Event[processAttrs]]
CriteriaMatcher pipe.Middle[[]Event[processAttrs], []Event[ProcessMatch]]
ExecTyper pipe.Middle[[]Event[ProcessMatch], []Event[Instrumentable]]
ContainerDBUpdater pipe.Middle[[]Event[Instrumentable], []Event[Instrumentable]]
TraceAttacher pipe.Final[[]Event[Instrumentable]]
}
func (pf *nodesMap) Connect() {
pf.ProcessWatcher.SendTo(pf.WatcherKubeEnricher)
pf.WatcherKubeEnricher.SendTo(pf.CriteriaMatcher)
pf.CriteriaMatcher.SendTo(pf.ExecTyper)
pf.ExecTyper.SendTo(pf.ContainerDBUpdater)
pf.ContainerDBUpdater.SendTo(pf.TraceAttacher)
}
func processWatcher(pf *nodesMap) *pipe.Start[[]Event[processAttrs]] { return &pf.ProcessWatcher }
func ptrWatcherKubeEnricher(pf *nodesMap) *pipe.Middle[[]Event[processAttrs], []Event[processAttrs]] {
return &pf.WatcherKubeEnricher
}
func criteriaMatcher(pf *nodesMap) *pipe.Middle[[]Event[processAttrs], []Event[ProcessMatch]] {
return &pf.CriteriaMatcher
}
func execTyper(pf *nodesMap) *pipe.Middle[[]Event[ProcessMatch], []Event[Instrumentable]] {
return &pf.ExecTyper
}
func containerDBUpdater(pf *nodesMap) *pipe.Middle[[]Event[Instrumentable], []Event[Instrumentable]] {
return &pf.ContainerDBUpdater
}
func traceAttacher(pf *nodesMap) *pipe.Final[[]Event[Instrumentable]] { return &pf.TraceAttacher }
func NewProcessFinder(ctx context.Context, cfg *beyla.Config, ctxInfo *global.ContextInfo) *ProcessFinder {
return &ProcessFinder{ctx: ctx, cfg: cfg, ctxInfo: ctxInfo}
}
// Start the ProcessFinder pipeline in background. It returns a channel where each new discovered
// ebpf.ProcessTracer will be notified.
func (pf *ProcessFinder) Start() (<-chan *ebpf.ProcessTracer, <-chan *Instrumentable, error) {
discoveredTracers, deleteTracers := make(chan *ebpf.ProcessTracer), make(chan *Instrumentable)
gb := pipe.NewBuilder(&nodesMap{}, pipe.ChannelBufferLen(pf.cfg.ChannelBufferLen))
pipe.AddStart(gb, processWatcher, ProcessWatcherFunc(pf.ctx, pf.cfg))
pipe.AddMiddleProvider(gb, ptrWatcherKubeEnricher,
WatcherKubeEnricherProvider(pf.ctxInfo.K8sEnabled, pf.ctxInfo.AppO11y.K8sInformer))
pipe.AddMiddleProvider(gb, criteriaMatcher, CriteriaMatcherProvider(pf.cfg))
pipe.AddMiddleProvider(gb, execTyper, ExecTyperProvider(pf.cfg, pf.ctxInfo.Metrics))
pipe.AddMiddleProvider(gb, containerDBUpdater,
ContainerDBUpdaterProvider(pf.ctxInfo.K8sEnabled, pf.ctxInfo.AppO11y.K8sDatabase))
pipe.AddFinalProvider(gb, traceAttacher, TraceAttacherProvider(&TraceAttacher{
Cfg: pf.cfg,
Ctx: pf.ctx,
DiscoveredTracers: discoveredTracers,
DeleteTracers: deleteTracers,
Metrics: pf.ctxInfo.Metrics,
}))
pipeline, err := gb.Build()
if err != nil {
return nil, nil, fmt.Errorf("can't instantiate discovery.ProcessFinder pipeline: %w", err)
}
pipeline.Start()
return discoveredTracers, deleteTracers, nil
}
// auxiliary functions to instantiate the go and non-go tracers on diverse steps of the
// discovery pipeline
func newGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
// Each program is an eBPF source: net/http, grpc...
return []ebpf.Tracer{
nethttp.New(cfg, metrics),
grpc.New(cfg, metrics),
goruntime.New(cfg, metrics),
}
}
func newNonGoTracersGroup(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
return []ebpf.Tracer{httpfltr.New(cfg, metrics), httpssl.New(cfg, metrics)}
}
func newNonGoTracersGroupUProbes(cfg *beyla.Config, metrics imetrics.Reporter) []ebpf.Tracer {
return []ebpf.Tracer{httpssl.New(cfg, metrics)}
}