-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
agent.go
210 lines (174 loc) · 5.43 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
// Package agent holds agent related files
package agent
import (
"bytes"
"context"
"fmt"
"io"
"runtime"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
"github.com/DataDog/datadog-agent/pkg/security/common"
"github.com/DataDog/datadog-agent/pkg/security/proto/api"
"github.com/DataDog/datadog-agent/pkg/security/security_profile/dump"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
// RuntimeSecurityAgent represents the main wrapper for the Runtime Security product
type RuntimeSecurityAgent struct {
hostname string
reporter common.RawReporter
client *RuntimeSecurityClient
running *atomic.Bool
wg sync.WaitGroup
connected *atomic.Bool
eventReceived *atomic.Uint64
activityDumpReceived *atomic.Uint64
telemetry *telemetry
endpoints *config.Endpoints
cancel context.CancelFunc
// activity dump
storage *dump.ActivityDumpStorageManager
}
// RSAOptions represents the runtime security agent options
type RSAOptions struct {
LogProfiledWorkloads bool
IgnoreDDAgentContainers bool
}
// Start the runtime security agent
func (rsa *RuntimeSecurityAgent) Start(reporter common.RawReporter, endpoints *config.Endpoints) {
rsa.reporter = reporter
rsa.endpoints = endpoints
ctx, cancel := context.WithCancel(context.Background())
rsa.cancel = cancel
rsa.running.Store(true)
// Start the system-probe events listener
go rsa.StartEventListener()
if runtime.GOOS == "linux" {
// Start activity dumps listener
go rsa.StartActivityDumpListener()
}
if rsa.telemetry != nil {
// Send Runtime Security Agent telemetry
go rsa.telemetry.run(ctx, rsa)
}
}
// Stop the runtime recurity agent
func (rsa *RuntimeSecurityAgent) Stop() {
rsa.cancel()
rsa.running.Store(false)
rsa.client.Close()
rsa.wg.Wait()
}
// StartEventListener starts listening for new events from system-probe
func (rsa *RuntimeSecurityAgent) StartEventListener() {
rsa.wg.Add(1)
defer rsa.wg.Done()
rsa.connected.Store(false)
logTicker := newLogBackoffTicker()
for rsa.running.Load() {
stream, err := rsa.client.GetEvents()
if err != nil {
rsa.connected.Store(false)
select {
case <-logTicker.C:
msg := fmt.Sprintf("error while connecting to the runtime security module: %v", err)
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.Unavailable:
msg += ", please check that the runtime security module is enabled in the system-probe.yaml config file"
}
}
log.Error(msg)
default:
// do nothing
}
// retry in 2 seconds
time.Sleep(2 * time.Second)
continue
}
if !rsa.connected.Load() {
rsa.connected.Store(true)
log.Info("Successfully connected to the runtime security module")
}
for {
// Get new event from stream
in, err := stream.Recv()
if err == io.EOF || in == nil {
break
}
log.Tracef("Got message from rule `%s` for event `%s`", in.RuleID, string(in.Data))
rsa.eventReceived.Inc()
// Dispatch security event
rsa.DispatchEvent(in)
}
}
}
// StartActivityDumpListener starts listening for new activity dumps from system-probe
func (rsa *RuntimeSecurityAgent) StartActivityDumpListener() {
rsa.wg.Add(1)
defer rsa.wg.Done()
for rsa.running.Load() {
stream, err := rsa.client.GetActivityDumpStream()
if err != nil {
// retry in 2 seconds
time.Sleep(2 * time.Second)
continue
}
for {
// Get new activity dump from stream
msg, err := stream.Recv()
if err == io.EOF || msg == nil {
break
}
log.Tracef("Got activity dump [%s]", msg.GetDump().GetMetadata().GetName())
rsa.activityDumpReceived.Inc()
// Dispatch activity dump
rsa.DispatchActivityDump(msg)
}
}
}
// DispatchEvent dispatches a security event message to the subsytems of the runtime security agent
func (rsa *RuntimeSecurityAgent) DispatchEvent(evt *api.SecurityEventMessage) {
if rsa.reporter == nil {
return
}
rsa.reporter.ReportRaw(evt.GetData(), evt.Service, evt.GetTags()...)
}
// DispatchActivityDump forwards an activity dump message to the backend
func (rsa *RuntimeSecurityAgent) DispatchActivityDump(msg *api.ActivityDumpStreamMessage) {
// parse dump from message
dump, err := dump.NewActivityDumpFromMessage(msg.GetDump())
if err != nil {
log.Errorf("%v", err)
return
}
if rsa.telemetry != nil {
// register for telemetry for this container
imageName, imageTag := dump.GetImageNameTag()
rsa.telemetry.registerProfiledContainer(imageName, imageTag)
raw := bytes.NewBuffer(msg.GetData())
for _, requests := range dump.StorageRequests {
if err := rsa.storage.PersistRaw(requests, dump, raw); err != nil {
log.Errorf("%v", err)
}
}
}
}
// newLogBackoffTicker returns a ticker based on an exponential backoff, used to trigger connect error logs
func newLogBackoffTicker() *backoff.Ticker {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 2 * time.Second
expBackoff.MaxInterval = 60 * time.Second
expBackoff.MaxElapsedTime = 0
expBackoff.Reset()
return backoff.NewTicker(expBackoff)
}