/
pipeline_stats.go
158 lines (146 loc) · 5.88 KB
/
pipeline_stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package api
import (
"bytes"
"errors"
"fmt"
"io"
stdlog "log"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/datadog-go/v5/statsd"
)
const (
pipelineStatsURLSuffix = "/api/v0.1/pipeline_stats"
)
// pipelineStatsEndpoint returns the pipeline intake url and the corresponding API key.
func pipelineStatsEndpoints(cfg *config.AgentConfig) (urls []*url.URL, apiKeys []string, err error) {
if e := cfg.Endpoints; len(e) == 0 || e[0].Host == "" || e[0].APIKey == "" {
return nil, nil, errors.New("config was not properly validated")
}
for _, e := range cfg.Endpoints {
urlStr := e.Host + pipelineStatsURLSuffix
log.Debug("[pipeline_stats] Intake URL %s", urlStr)
url, err := url.Parse(urlStr)
if err != nil {
return nil, nil, fmt.Errorf("error parsing pipeline stats intake URL %q: %v", urlStr, err)
}
urls = append(urls, url)
apiKeys = append(apiKeys, e.APIKey)
}
return urls, apiKeys, nil
}
// pipelineStatsProxyHandler returns a new HTTP handler which will proxy requests to the pipeline stats intake.
func (r *HTTPReceiver) pipelineStatsProxyHandler() http.Handler {
log.Debug("[pipeline_stats] Creating proxy handler")
urls, apiKeys, err := pipelineStatsEndpoints(r.conf)
if err != nil {
log.Errorf("[pipeline_stats] Failed to start pipeline stats proxy handler: %v", err)
return pipelineStatsErrorHandler(err)
}
tags := fmt.Sprintf("host:%s,default_env:%s,agent_version:%s", r.conf.Hostname, r.conf.DefaultEnv, r.conf.AgentVersion)
if orch := r.conf.FargateOrchestrator; orch != config.OrchestratorUnknown {
tag := fmt.Sprintf("orchestrator:fargate_%s", strings.ToLower(string(orch)))
tags = tags + "," + tag
}
return newPipelineStatsProxy(r.conf, urls, apiKeys, tags, r.statsd)
}
func pipelineStatsErrorHandler(err error) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
msg := fmt.Sprintf("Pipeline stats forwarder is OFF: %v", err)
http.Error(w, msg, http.StatusInternalServerError)
})
}
// newPipelineStatsProxy creates an http.ReverseProxy which forwards requests to the pipeline stats intake.
// The tags will be added as a header to all proxied requests.
func newPipelineStatsProxy(conf *config.AgentConfig, urls []*url.URL, apiKeys []string, tags string, statsd statsd.ClientInterface) *httputil.ReverseProxy {
log.Debug("[pipeline_stats] Creating reverse proxy")
cidProvider := NewIDProvider(conf.ContainerProcRoot)
director := func(req *http.Request) {
req.Header.Set("Via", fmt.Sprintf("trace-agent %s", conf.AgentVersion))
if _, ok := req.Header["User-Agent"]; !ok {
// explicitly disable User-Agent so it's not set to the default value
// that net/http gives it: Go-http-client/1.1
// See https://codereview.appspot.com/7532043
req.Header.Set("User-Agent", "")
}
containerID := cidProvider.GetContainerID(req.Context(), req.Header)
if ctags := getContainerTags(conf.ContainerTags, containerID); ctags != "" {
req.Header.Set("X-Datadog-Container-Tags", ctags)
log.Debugf("Setting header X-Datadog-Container-Tags=%s for pipeline stats proxy", ctags)
}
req.Header.Set("X-Datadog-Additional-Tags", tags)
log.Debugf("Setting header X-Datadog-Additional-Tags=%s for pipeline stats proxy", tags)
_ = statsd.Count("datadog.trace_agent.pipelines_stats", 1, nil, 1)
}
logger := log.NewThrottled(5, 10*time.Second) // limit to 5 messages every 10 seconds
return &httputil.ReverseProxy{
Director: director,
ErrorLog: stdlog.New(logger, "pipeline_stats.Proxy: ", 0),
Transport: &multiDataStreamsTransport{rt: conf.NewHTTPTransport(), targets: urls, keys: apiKeys},
}
}
// multiDataStreamsTransport sends HTTP requests to multiple targets using an
// underlying http.RoundTripper. API keys are set separately for each target.
// When multiple endpoints are in use the response from the main endpoint
// is proxied back to the client, while for all additional endpoints the
// response is discarded. There is no de-duplication done between endpoint
// hosts or api keys.
type multiDataStreamsTransport struct {
rt http.RoundTripper
targets []*url.URL
keys []string
}
func (m *multiDataStreamsTransport) RoundTrip(req *http.Request) (rresp *http.Response, rerr error) {
setTarget := func(r *http.Request, u *url.URL, apiKey string) {
r.Host = u.Host
r.URL = u
r.Header.Set("DD-API-KEY", apiKey)
}
if len(m.targets) == 1 {
setTarget(req, m.targets[0], m.keys[0])
rresp, rerr = m.rt.RoundTrip(req)
if rerr != nil {
log.Errorf("[pipeline_stats] RoundTrip failed: %v", rerr)
} else {
log.Debugf("[pipeline_stats] Returned status: %s, from host: %s, path: %s", rresp.Status, m.targets[0].Host, m.targets[0].Path)
}
return rresp, rerr
}
slurp, err := io.ReadAll(req.Body)
if err != nil {
return nil, err
}
for i, u := range m.targets {
newreq := req.Clone(req.Context())
newreq.Body = io.NopCloser(bytes.NewReader(slurp))
setTarget(newreq, u, m.keys[i])
if i == 0 {
// given the way we construct the list of targets the main endpoint
// will be the first one called, we return its response and error
rresp, rerr = m.rt.RoundTrip(newreq)
if rerr != nil {
log.Errorf("[pipeline_stats] RoundTrip failed: %v", rerr)
} else {
log.Debugf("[pipeline_stats] Returned status: %s, from host: %s, path: %s", rresp.Status, m.targets[0].Host, m.targets[0].Path)
}
continue
}
if resp, err := m.rt.RoundTrip(newreq); err == nil {
// we discard responses for all subsequent requests
io.Copy(io.Discard, resp.Body) //nolint:errcheck
resp.Body.Close()
} else {
log.Error(err)
}
}
return rresp, rerr
}