/
telemetry.go
242 lines (222 loc) · 9.07 KB
/
telemetry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022-present Datadog, Inc.
package api
import (
"bytes"
"fmt"
"io"
stdlog "log"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"time"
"github.com/DataDog/datadog-agent/pkg/trace/api/internal/header"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/datadog-go/v5/statsd"
)
const functionARNKeyTag = "function_arn"
const originTag = "origin"
type cloudResourceType string
type cloudProvider string
const (
awsLambda cloudResourceType = "AWSLambda"
awsFargate cloudResourceType = "AWSFargate"
cloudRun cloudResourceType = "GCPCloudRun"
azureAppService cloudResourceType = "AzureAppService"
azureContainerApp cloudResourceType = "AzureContainerApp"
aws cloudProvider = "AWS"
gcp cloudProvider = "GCP"
azure cloudProvider = "Azure"
cloudProviderHeader string = "dd-cloud-provider"
cloudResourceTypeHeader string = "dd-cloud-resource-type"
cloudResourceIdentifierHeader string = "dd-cloud-resource-identifier"
)
// telemetryMultiTransport sends HTTP requests to multiple targets using an
// underlying http.RoundTripper. API keys are set separately for each target.
// The target hostname
// When multiple endpoints are in use the response from the main endpoint
// is proxied back to the client, while for all aditional endpoints the
// response is discarded. There is no de-duplication done between endpoint
// hosts or api keys.
//
// Could be extended in the future to allow supporting more product endpoints
// by simply parametrizing metric tags, and logger names
type telemetryMultiTransport struct {
Transport http.RoundTripper
Endpoints []*config.Endpoint
statsd statsd.ClientInterface
}
// telemetryProxyHandler parses returns a new HTTP handler which will proxy requests to the configured intakes.
// If the main intake URL can not be computed because of config, the returned handler will always
// return http.StatusInternalServerError along with a clarification.
func (r *HTTPReceiver) telemetryProxyHandler() http.Handler {
// extract and validate Hostnames from configured endpoints
var endpoints []*config.Endpoint
for _, endpoint := range r.conf.TelemetryConfig.Endpoints {
u, err := url.Parse(endpoint.Host)
if err != nil {
log.Errorf("Error parsing apm_config.telemetry endpoint %q: %v", endpoint.Host, err)
continue
}
if u.Host != "" {
endpoint.Host = u.Host
}
endpoints = append(endpoints, endpoint)
}
if len(endpoints) == 0 {
log.Error("None of the configured apm_config.telemetry endpoints are valid. Telemetry proxy is off")
return http.NotFoundHandler()
}
installSignature := r.conf.InstallSignature
underlyingTransport := r.conf.NewHTTPTransport()
// Fix and documentation taken from pkg/trace/api/profiles.go
// The intake's connection timeout is 60 seconds, which is similar to the default heartbeat periodicity of
// telemetry clients. When a new heartbeat is simultaneous to the intake closing the connection, Go's ReverseProxy
// returns a 502 error to the tracer. Ensuring that the agent closes the connection before the intake solves this
// race condition. A value of 47 was chosen as it's a prime number which doesn't divide 60, reducing the risk of
// overlap with other timeouts or periodicities. It provides sufficient buffer time compared to 60, whilst still
// allowing connection reuse.
underlyingTransport.IdleConnTimeout = 47 * time.Second
transport := telemetryMultiTransport{
Transport: underlyingTransport,
Endpoints: endpoints,
statsd: r.statsd,
}
limitedLogger := log.NewThrottled(5, 10*time.Second) // limit to 5 messages every 10 seconds
logger := stdlog.New(limitedLogger, "telemetry.Proxy: ", 0)
director := func(req *http.Request) {
req.Header.Set("Via", fmt.Sprintf("trace-agent %s", r.conf.AgentVersion))
if _, ok := req.Header["User-Agent"]; !ok {
// explicitly disable User-Agent so it's not set to the default value
// that net/http gives it: Go-http-client/1.1
// See https://codereview.appspot.com/7532043
req.Header.Set("User-Agent", "")
}
containerID := r.containerIDProvider.GetContainerID(req.Context(), req.Header)
if containerID == "" {
_ = r.statsd.Count("datadog.trace_agent.telemetry_proxy.no_container_id_found", 1, []string{}, 1)
}
containerTags := getContainerTags(r.conf.ContainerTags, containerID)
req.Header.Set("DD-Agent-Hostname", r.conf.Hostname)
req.Header.Set("DD-Agent-Env", r.conf.DefaultEnv)
log.Debugf("Setting headers DD-Agent-Hostname=%s, DD-Agent-Env=%s for telemetry proxy", r.conf.Hostname, r.conf.DefaultEnv)
if containerID != "" {
req.Header.Set(header.ContainerID, containerID)
}
if containerTags != "" {
req.Header.Set("x-datadog-container-tags", containerTags)
log.Debugf("Setting header x-datadog-container-tags=%s for telemetry proxy", containerTags)
}
if installSignature.Found {
req.Header.Set("DD-Agent-Install-Id", installSignature.InstallID)
req.Header.Set("DD-Agent-Install-Type", installSignature.InstallType)
req.Header.Set("DD-Agent-Install-Time", strconv.FormatInt(installSignature.InstallTime, 10))
}
if arn, ok := r.conf.GlobalTags[functionARNKeyTag]; ok {
req.Header.Set(cloudProviderHeader, string(aws))
req.Header.Set(cloudResourceTypeHeader, string(awsLambda))
req.Header.Set(cloudResourceIdentifierHeader, arn)
} else if taskArn, ok := extractFargateTask(containerTags); ok {
req.Header.Set(cloudProviderHeader, string(aws))
req.Header.Set(cloudResourceTypeHeader, string(awsFargate))
req.Header.Set(cloudResourceIdentifierHeader, taskArn)
}
if origin, ok := r.conf.GlobalTags[originTag]; ok {
switch origin {
case "cloudrun":
req.Header.Set(cloudProviderHeader, string(gcp))
req.Header.Set(cloudResourceTypeHeader, string(cloudRun))
if serviceName, found := r.conf.GlobalTags["service_name"]; found {
req.Header.Set(cloudResourceIdentifierHeader, serviceName)
}
case "appservice":
req.Header.Set(cloudProviderHeader, string(azure))
req.Header.Set(cloudResourceTypeHeader, string(azureAppService))
if appName, found := r.conf.GlobalTags["app_name"]; found {
req.Header.Set(cloudResourceIdentifierHeader, appName)
}
case "containerapp":
req.Header.Set(cloudProviderHeader, string(azure))
req.Header.Set(cloudResourceTypeHeader, string(azureContainerApp))
if appName, found := r.conf.GlobalTags["app_name"]; found {
req.Header.Set(cloudResourceIdentifierHeader, appName)
}
}
}
}
return &httputil.ReverseProxy{
Director: director,
ErrorLog: logger,
Transport: &transport,
}
}
func extractFargateTask(containerTags string) (string, bool) {
return extractTag(containerTags, "task_arn")
}
func extractTag(tags string, name string) (string, bool) {
leftoverTags := tags
for {
if leftoverTags == "" {
return "", false
}
var tag string
tag, leftoverTags, _ = strings.Cut(leftoverTags, ",")
tagName, value, hasValue := strings.Cut(tag, ":")
if hasValue && tagName == name {
return value, true
}
}
}
// RoundTrip sends request first to Endpoint[0], then sends a copy of main request to every configurged
// additional endpoint.
//
// All requests will be sent irregardless of any errors
// If any request fails, the error will be logged. Only main target's
// error will be propagated via return value
func (m *telemetryMultiTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if len(m.Endpoints) == 1 {
return m.roundTrip(req, m.Endpoints[0])
}
slurp, err := io.ReadAll(req.Body)
if err != nil {
return nil, err
}
newreq := req.Clone(req.Context())
newreq.Body = io.NopCloser(bytes.NewReader(slurp))
// despite the number of endpoints, we always return the response of the first
rresp, rerr := m.roundTrip(newreq, m.Endpoints[0])
for _, endpoint := range m.Endpoints[1:] {
newreq := req.Clone(req.Context())
newreq.Body = io.NopCloser(bytes.NewReader(slurp))
if resp, err := m.roundTrip(newreq, endpoint); err == nil {
// we discard responses for all subsequent requests
io.Copy(io.Discard, resp.Body) //nolint:errcheck
resp.Body.Close()
} else {
log.Error(err)
}
}
return rresp, rerr
}
func (m *telemetryMultiTransport) roundTrip(req *http.Request, endpoint *config.Endpoint) (*http.Response, error) {
tags := []string{
fmt.Sprintf("endpoint:%s", endpoint.Host),
}
defer func(now time.Time) {
_ = m.statsd.Timing("datadog.trace_agent.telemetry_proxy.roundtrip_ms", time.Since(now), tags, 1)
}(time.Now())
req.Host = endpoint.Host
req.URL.Host = endpoint.Host
req.URL.Scheme = "https"
req.Header.Set("DD-API-KEY", endpoint.APIKey)
resp, err := m.Transport.RoundTrip(req)
if err != nil {
_ = m.statsd.Count("datadog.trace_agent.telemetry_proxy.error", 1, tags, 1)
}
return resp, err
}