-
Notifications
You must be signed in to change notification settings - Fork 423
/
remoteconfig.go
382 lines (337 loc) · 10.5 KB
/
remoteconfig.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.
package remoteconfig
import (
"bytes"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"os"
"strings"
"time"
rc "github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
)
// Callback represents a function that can process a remote config update.
// A Callback function can be registered to a remote config client to automatically
// react upon receiving updates. This function returns the configuration processing status
// for each config file received through the update.
type Callback func(u ProductUpdate) map[string]rc.ApplyStatus
// Capability represents a bit index to be set in clientData.Capabilites in order to register a client
// for a specific capability
type Capability uint
const (
_ Capability = iota
// ASMActivation represents the capability to activate ASM through remote configuration
ASMActivation
// ASMIPBlocking represents the capability for ASM to block requests based on user IP
ASMIPBlocking
// ASMDDRules represents the capability to update the rules used by the ASM WAF for threat detection
ASMDDRules
)
// DefaultClientConfig returns the default remote config client configuration
func DefaultClientConfig() ClientConfig {
return ClientConfig{
Env: os.Getenv("DD_ENV"),
HTTP: &http.Client{Timeout: 10 * time.Second},
PollInterval: time.Second * 1,
RuntimeID: globalconfig.RuntimeID(),
ServiceName: globalconfig.ServiceName(),
TracerVersion: version.Tag,
TUFRoot: os.Getenv("DD_RC_TUF_ROOT"),
}
}
// ProductUpdate represents an update for a specific product.
// It is a map of file path to raw file content
type ProductUpdate map[string][]byte
// ClientConfig contains the required values to configure a remoteconfig client
type ClientConfig struct {
// The address at which the agent is listening for remoteconfig update requests on
AgentURL string
// The semantic version of the user's application
AppVersion string
// The env this tracer is running in
Env string
// The time interval between two client polls to the agent for updates
PollInterval time.Duration
// A list of remote config products this client is interested in
Products []string
// The tracer's runtime id
RuntimeID string
// The name of the user's application
ServiceName string
// The semantic version of the tracer
TracerVersion string
// The base TUF root metadata file
TUFRoot string
// The capabilities of the client
Capabilities []Capability
// HTTP is the HTTP client used to receive config updates
HTTP *http.Client
}
// A Client interacts with an Agent to update and track the state of remote
// configuration
type Client struct {
ClientConfig
clientID string
endpoint string
repository *rc.Repository
stop chan struct{}
callbacks map[string][]Callback
lastError error
}
// NewClient creates a new remoteconfig Client
func NewClient(config ClientConfig) (*Client, error) {
repo, err := rc.NewUnverifiedRepository()
if err != nil {
return nil, err
}
if config.HTTP == nil {
config.HTTP = DefaultClientConfig().HTTP
}
return &Client{
ClientConfig: config,
clientID: generateID(),
endpoint: fmt.Sprintf("%s/v0.7/config", config.AgentURL),
repository: repo,
stop: make(chan struct{}),
lastError: nil,
callbacks: map[string][]Callback{},
}, nil
}
// Start starts the client's update poll loop in a fresh goroutine
func (c *Client) Start() {
go func() {
ticker := time.NewTicker(c.PollInterval)
defer ticker.Stop()
for {
select {
case <-c.stop:
return
case <-ticker.C:
c.updateState()
}
}
}()
}
// Stop stops the client's update poll loop
func (c *Client) Stop() {
close(c.stop)
}
func (c *Client) updateState() {
data, err := c.newUpdateRequest()
if err != nil {
log.Error("remoteconfig: unexpected error while creating a new update request payload: %v", err)
return
}
req, err := http.NewRequest(http.MethodGet, c.endpoint, &data)
if err != nil {
log.Error("remoteconfig: unexpected error while creating a new http request: %v", err)
return
}
resp, err := c.HTTP.Do(req)
if err != nil {
log.Debug("remoteconfig: http request error: %v", err)
return
}
// Flush and close the response body when returning (cf. https://pkg.go.dev/net/http#Client.Do)
defer func() {
io.ReadAll(resp.Body)
resp.Body.Close()
}()
if sc := resp.StatusCode; sc != http.StatusOK {
log.Debug("remoteconfig: http request error: response status code is not 200 (OK) but %s", http.StatusText(sc))
return
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
log.Error("remoteconfig: http request error: could not read the response body: %v", err)
return
}
if body := string(respBody); body == `{}` || body == `null` {
return
}
var update clientGetConfigsResponse
if err := json.Unmarshal(respBody, &update); err != nil {
log.Error("remoteconfig: http request error: could not parse the json response body: %v", err)
return
}
c.lastError = c.applyUpdate(&update)
}
// RegisterCallback allows registering a callback that will be invoked when the client
// receives a configuration update for the specified product.
func (c *Client) RegisterCallback(f Callback, product string) {
c.callbacks[product] = append(c.callbacks[product], f)
}
func (c *Client) applyUpdate(pbUpdate *clientGetConfigsResponse) error {
fileMap := make(map[string][]byte, len(pbUpdate.TargetFiles))
productUpdates := make(map[string]ProductUpdate, len(c.Products))
for _, f := range pbUpdate.TargetFiles {
fileMap[f.Path] = f.Raw
for _, p := range c.Products {
productUpdates[p] = make(ProductUpdate)
if strings.Contains(f.Path, p) {
productUpdates[p][f.Path] = f.Raw
}
}
}
mapify := func(s *rc.RepositoryState) map[string]string {
m := make(map[string]string)
for i := range s.Configs {
path := s.CachedFiles[i].Path
product := s.Configs[i].Product
m[path] = product
}
return m
}
// Check the repository state before and after the update to detect which configs are not being sent anymore.
// This is needed because some products can stop sending configurations, and we want to make sure that the subscribers
// are provided with this information in this case
stateBefore, err := c.repository.CurrentState()
if err != nil {
return fmt.Errorf("repository current state error: %v", err)
}
products, err := c.repository.Update(rc.Update{
TUFRoots: pbUpdate.Roots,
TUFTargets: pbUpdate.Targets,
TargetFiles: fileMap,
ClientConfigs: pbUpdate.ClientConfigs,
})
if err != nil {
return fmt.Errorf("repository update error: %v", err)
}
stateAfter, err := c.repository.CurrentState()
if err != nil {
return fmt.Errorf("repository current state error after update: %v", err)
}
// Create a config files diff between before/after the update to see which config files are missing
mBefore := mapify(&stateBefore)
for k := range mapify(&stateAfter) {
delete(mBefore, k)
}
// Set the payload data to nil for missing config files. The callbacks then can handle the nil config case to detect
// that this config will not be updated anymore.
updatedProducts := make(map[string]struct{})
for path, product := range mBefore {
if productUpdates[product] == nil {
productUpdates[product] = make(ProductUpdate)
}
productUpdates[product][path] = nil
updatedProducts[product] = struct{}{}
}
// Aggregate updated products and missing products so that callbacks get called for both
for _, p := range products {
updatedProducts[p] = struct{}{}
}
// Performs the callbacks registered for all updated products and update the application status in the repository
// (RCTE2)
for p := range updatedProducts {
for _, fn := range c.callbacks[p] {
for path, status := range fn(productUpdates[p]) {
c.repository.UpdateApplyStatus(path, status)
}
}
}
return nil
}
func (c *Client) newUpdateRequest() (bytes.Buffer, error) {
state, err := c.repository.CurrentState()
if err != nil {
return bytes.Buffer{}, err
}
// Temporary check while using untrusted repo, for which no initial root file is provided
if state.RootsVersion < 1 {
state.RootsVersion = 1
}
pbCachedFiles := make([]*targetFileMeta, 0, len(state.CachedFiles))
for _, f := range state.CachedFiles {
pbHashes := make([]*targetFileHash, 0, len(f.Hashes))
for alg, hash := range f.Hashes {
pbHashes = append(pbHashes, &targetFileHash{
Algorithm: alg,
Hash: hex.EncodeToString(hash),
})
}
pbCachedFiles = append(pbCachedFiles, &targetFileMeta{
Path: f.Path,
Length: int64(f.Length),
Hashes: pbHashes,
})
}
hasError := c.lastError != nil
errMsg := ""
if hasError {
errMsg = c.lastError.Error()
}
var pbConfigState []*configState
if !hasError {
pbConfigState = make([]*configState, 0, len(state.Configs))
for _, f := range state.Configs {
pbConfigState = append(pbConfigState, &configState{
ID: f.ID,
Version: f.Version,
Product: f.Product,
ApplyState: f.ApplyStatus.State,
ApplyError: f.ApplyStatus.Error,
})
}
}
cap := big.NewInt(0)
for _, i := range c.Capabilities {
cap.SetBit(cap, int(i), 1)
}
req := clientGetConfigsRequest{
Client: &clientData{
State: &clientState{
RootVersion: uint64(state.RootsVersion),
TargetsVersion: uint64(state.TargetsVersion),
ConfigStates: pbConfigState,
HasError: hasError,
Error: errMsg,
},
ID: c.clientID,
Products: c.Products,
IsTracer: true,
ClientTracer: &clientTracer{
RuntimeID: c.RuntimeID,
Language: "go",
TracerVersion: c.TracerVersion,
Service: c.ServiceName,
Env: c.Env,
AppVersion: c.AppVersion,
},
Capabilities: cap.Bytes(),
},
CachedTargetFiles: pbCachedFiles,
}
var b bytes.Buffer
err = json.NewEncoder(&b).Encode(&req)
if err != nil {
return bytes.Buffer{}, err
}
return b, nil
}
var (
idSize = 21
idAlphabet = []rune("_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
)
func generateID() string {
bytes := make([]byte, idSize)
_, err := rand.Read(bytes)
if err != nil {
panic(err)
}
id := make([]rune, idSize)
for i := 0; i < idSize; i++ {
id[i] = idAlphabet[bytes[i]&63]
}
return string(id[:idSize])
}