/
events_consumer.go
140 lines (118 loc) · 3.79 KB
/
events_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
//nolint:revive // TODO(PROC) Fix revive linter
package consumer
import (
"time"
"github.com/DataDog/datadog-go/v5/statsd"
"go.uber.org/atomic"
"github.com/DataDog/datadog-agent/pkg/eventmonitor"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/proto/api"
"github.com/DataDog/datadog-agent/pkg/process/events/model"
"github.com/DataDog/datadog-agent/pkg/security/metrics"
smodel "github.com/DataDog/datadog-agent/pkg/security/secl/model"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
// ProcessConsumer is part of the event monitoring module of the system-probe. It receives
// events, batches them in the messages channel and serves the messages to the process-agent
// over GRPC when requested
type ProcessConsumer struct {
api.EventMonitoringModuleServer
messages chan *api.ProcessEventMessage
maxMessageBurst int
expiredEvents *atomic.Int64
statsdClient statsd.ClientInterface
}
// NewProcessConsumer returns a new ProcessConsumer instance
func NewProcessConsumer(evm *eventmonitor.EventMonitor) (*ProcessConsumer, error) {
p := &ProcessConsumer{
messages: make(chan *api.ProcessEventMessage, evm.Config.EventServerBurst*3),
maxMessageBurst: evm.Config.EventServerBurst,
expiredEvents: atomic.NewInt64(0),
statsdClient: evm.StatsdClient,
}
api.RegisterEventMonitoringModuleServer(evm.GRPCServer, p)
if err := evm.AddEventTypeHandler(smodel.ForkEventType, p); err != nil {
return nil, err
}
if err := evm.AddEventTypeHandler(smodel.ExecEventType, p); err != nil {
return nil, err
}
if err := evm.AddEventTypeHandler(smodel.ExitEventType, p); err != nil {
return nil, err
}
return p, nil
}
//nolint:revive // TODO(PROC) Fix revive linter
func (p *ProcessConsumer) Start() error {
return nil
}
//nolint:revive // TODO(PROC) Fix revive linter
func (p *ProcessConsumer) Stop() {
}
// ID returns id for process monitor
func (p *ProcessConsumer) ID() string {
return "PROCESS"
}
//nolint:revive // TODO(PROC) Fix revive linter
func (p *ProcessConsumer) SendStats() {
if count := p.expiredEvents.Swap(0); count > 0 {
if err := p.statsdClient.Count(metrics.MetricProcessEventsServerExpired, count, []string{}, 1.0); err != nil {
log.Warnf("Error sending process consumer stats: %v", err)
}
}
}
// HandleEvent implement the event monitor EventHandler interface
func (p *ProcessConsumer) HandleEvent(event any) {
e, ok := event.(*model.ProcessEvent)
if !ok {
log.Errorf("Event is not a Process Lifecycle Event")
return
}
data, err := e.MarshalMsg(nil)
if err != nil {
log.Errorf("Failed to marshal Process Lifecycle Event: %v", err)
return
}
m := &api.ProcessEventMessage{
Data: data,
}
select {
case p.messages <- m:
break
default:
// The channel is full, expire the oldest event
<-p.messages
p.expiredEvents.Inc()
// Try to send the event again
select {
case p.messages <- m:
break
default:
// looks like the process msgs channel is full again, expire the current event
p.expiredEvents.Inc()
break
}
break
}
}
// GetProcessEvents sends process events through a gRPC stream
func (p *ProcessConsumer) GetProcessEvents(params *api.GetProcessEventParams, stream api.EventMonitoringModule_GetProcessEventsServer) error {
msgs := 0
timeout := time.Duration(params.TimeoutSeconds) * time.Second
for msgs < p.maxMessageBurst {
select {
case msg := <-p.messages:
if err := stream.Send(msg); err != nil {
return err
}
msgs++
case <-time.After(timeout):
return nil
}
}
log.Debugf("Received process-events request from process-agent: sent %d events", msgs)
return nil
}